001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.DataOutput;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.List;
031import java.util.concurrent.atomic.AtomicReference;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038//import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.io.HeapSize;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.ClassSize;
053import org.apache.hadoop.hbase.util.ObjectIntPair;
054import org.apache.hadoop.io.WritableUtils;
055import org.apache.hadoop.util.StringUtils;
056
057/**
058 * Provides functionality to write ({@link BlockIndexWriter}) and read
059 * BlockIndexReader
060 * single-level and multi-level block indexes.
061 *
062 * Examples of how to use the block index writer can be found in
063 * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
064 *  {@link HFileWriterImpl}. Examples of how to use the reader can be
065 *  found in {@link HFileReaderImpl} and
066 *  org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex.
067 */
068@InterfaceAudience.Private
069public class HFileBlockIndex {
070
071  private static final Logger LOG = LoggerFactory.getLogger(HFileBlockIndex.class);
072
073  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
074
075  /**
076   * The maximum size guideline for index blocks (both leaf, intermediate, and
077   * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
078   */
079  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
080
081  /**
082   * Minimum number of entries in a single index block. Even if we are above the
083   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
084   * entries. We should have at least a few entries so that we don't have too many levels in the
085   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
086   */
087  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
088
089  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
090
091  /**
092   * The number of bytes stored in each "secondary index" entry in addition to
093   * key bytes in the non-root index block format. The first long is the file
094   * offset of the deeper-level block the entry points to, and the int that
095   * follows is that block's on-disk size without including header.
096   */
097  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
098      + Bytes.SIZEOF_LONG;
099
100  /**
101   * Error message when trying to use inline block API in single-level mode.
102   */
103  private static final String INLINE_BLOCKS_NOT_ALLOWED =
104      "Inline blocks are not allowed in the single-level-only mode";
105
106  /**
107   * The size of a meta-data record used for finding the mid-key in a
108   * multi-level index. Consists of the middle leaf-level index block offset
109   * (long), its on-disk size without header included (int), and the mid-key
110   * entry's zero-based index in that leaf index block.
111   */
112  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
113      2 * Bytes.SIZEOF_INT;
114
115  /**
116   * An implementation of the BlockIndexReader that deals with block keys which are plain
117   * byte[] like MetaBlock or the Bloom Block for ROW bloom.
118   * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
119   */
120   static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
121
122    private byte[][] blockKeys;
123
124    public ByteArrayKeyBlockIndexReader(final int treeLevel,
125        final CachingBlockReader cachingBlockReader) {
126      this(treeLevel);
127      this.cachingBlockReader = cachingBlockReader;
128    }
129
130    public ByteArrayKeyBlockIndexReader(final int treeLevel) {
131      // Can be null for METAINDEX block
132      searchTreeLevel = treeLevel;
133    }
134
135    @Override
136    protected long calculateHeapSizeForBlockKeys(long heapSize) {
137      // Calculating the size of blockKeys
138      if (blockKeys != null) {
139        heapSize += ClassSize.REFERENCE;
140        // Adding array + references overhead
141        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
142
143        // Adding bytes
144        for (byte[] key : blockKeys) {
145          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
146        }
147      }
148      return heapSize;
149    }
150
151    @Override
152    public boolean isEmpty() {
153      return blockKeys.length == 0;
154    }
155
156    /**
157     * @param i
158     *          from 0 to {@link #getRootBlockCount() - 1}
159     */
160    public byte[] getRootBlockKey(int i) {
161      return blockKeys[i];
162    }
163
164    @Override
165    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
166        boolean cacheBlocks, boolean pread, boolean isCompaction,
167        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
168      // this would not be needed
169      return null;
170    }
171
172    @Override
173    public Cell midkey() throws IOException {
174      // Not needed here
175      return null;
176    }
177
178    @Override
179    protected void initialize(int numEntries) {
180      blockKeys = new byte[numEntries][];
181    }
182
183    @Override
184    protected void add(final byte[] key, final long offset, final int dataSize) {
185      blockOffsets[rootCount] = offset;
186      blockKeys[rootCount] = key;
187      blockDataSizes[rootCount] = dataSize;
188      rootCount++;
189    }
190
191    @Override
192    public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
193      int pos = Bytes.binarySearch(blockKeys, key, offset, length);
194      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
195      // binarySearch's javadoc.
196
197      if (pos >= 0) {
198        // This means this is an exact match with an element of blockKeys.
199        assert pos < blockKeys.length;
200        return pos;
201      }
202
203      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
204      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
205      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
206      // key < blockKeys[0], meaning the file does not contain the given key.
207
208      int i = -pos - 1;
209      assert 0 <= i && i <= blockKeys.length;
210      return i - 1;
211    }
212
213    @Override
214    public int rootBlockContainingKey(Cell key) {
215      // Should not be called on this because here it deals only with byte[]
216      throw new UnsupportedOperationException(
217          "Cannot search for a key that is of Cell type. Only plain byte array keys " +
218          "can be searched for");
219    }
220
221    @Override
222    public String toString() {
223      StringBuilder sb = new StringBuilder();
224      sb.append("size=" + rootCount).append("\n");
225      for (int i = 0; i < rootCount; i++) {
226        sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
227            .append("\n  offset=").append(blockOffsets[i])
228            .append(", dataSize=" + blockDataSizes[i]).append("\n");
229      }
230      return sb.toString();
231    }
232
233  }
234
235  /**
236   * An implementation of the BlockIndexReader that deals with block keys which are the key
237   * part of a cell like the Data block index or the ROW_COL bloom blocks
238   * This needs a comparator to work with the Cells
239   */
240   static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
241
242    private Cell[] blockKeys;
243    /** Pre-computed mid-key */
244    private AtomicReference<Cell> midKey = new AtomicReference<>();
245    /** Needed doing lookup on blocks. */
246    private CellComparator comparator;
247
248    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
249        final CachingBlockReader cachingBlockReader) {
250      this(c, treeLevel);
251      this.cachingBlockReader = cachingBlockReader;
252    }
253
254    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
255      // Can be null for METAINDEX block
256      comparator = c;
257      searchTreeLevel = treeLevel;
258    }
259
260    @Override
261    protected long calculateHeapSizeForBlockKeys(long heapSize) {
262      if (blockKeys != null) {
263        heapSize += ClassSize.REFERENCE;
264        // Adding array + references overhead
265        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
266
267        // Adding blockKeys
268        for (Cell key : blockKeys) {
269          heapSize += ClassSize.align(key.heapSize());
270        }
271      }
272      // Add comparator and the midkey atomicreference
273      heapSize += 2 * ClassSize.REFERENCE;
274      return heapSize;
275    }
276
277    @Override
278    public boolean isEmpty() {
279      return blockKeys.length == 0;
280    }
281
282    /**
283     * @param i
284     *          from 0 to {@link #getRootBlockCount() - 1}
285     */
286    public Cell getRootBlockKey(int i) {
287      return blockKeys[i];
288    }
289
290    @Override
291    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
292        boolean cacheBlocks, boolean pread, boolean isCompaction,
293        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
294      int rootLevelIndex = rootBlockContainingKey(key);
295      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
296        return null;
297      }
298
299      // the next indexed key
300      Cell nextIndexedKey = null;
301
302      // Read the next-level (intermediate or leaf) index block.
303      long currentOffset = blockOffsets[rootLevelIndex];
304      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
305
306      if (rootLevelIndex < blockKeys.length - 1) {
307        nextIndexedKey = blockKeys[rootLevelIndex + 1];
308      } else {
309        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
310      }
311
312      int lookupLevel = 1; // How many levels deep we are in our lookup.
313      int index = -1;
314
315      HFileBlock block = null;
316      KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
317      while (true) {
318        try {
319          // Must initialize it with null here, because if don't and once an exception happen in
320          // readBlock, then we'll release the previous assigned block twice in the finally block.
321          // (See HBASE-22422)
322          block = null;
323          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
324            // Avoid reading the same block again, even with caching turned off.
325            // This is crucial for compaction-type workload which might have
326            // caching turned off. This is like a one-block cache inside the
327            // scanner.
328            block = currentBlock;
329          } else {
330            // Call HFile's caching block reader API. We always cache index
331            // blocks, otherwise we might get terrible performance.
332            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
333            BlockType expectedBlockType;
334            if (lookupLevel < searchTreeLevel - 1) {
335              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
336            } else if (lookupLevel == searchTreeLevel - 1) {
337              expectedBlockType = BlockType.LEAF_INDEX;
338            } else {
339              // this also accounts for ENCODED_DATA
340              expectedBlockType = BlockType.DATA;
341            }
342            block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
343              pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
344          }
345
346          if (block == null) {
347            throw new IOException("Failed to read block at offset " + currentOffset
348                + ", onDiskSize=" + currentOnDiskSize);
349          }
350
351          // Found a data block, break the loop and check our level in the tree.
352          if (block.getBlockType().isData()) {
353            break;
354          }
355
356          // Not a data block. This must be a leaf-level or intermediate-level
357          // index block. We don't allow going deeper than searchTreeLevel.
358          if (++lookupLevel > searchTreeLevel) {
359            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
360                + ", searchTreeLevel=" + searchTreeLevel);
361          }
362
363          // Locate the entry corresponding to the given key in the non-root
364          // (leaf or intermediate-level) index block.
365          ByteBuff buffer = block.getBufferWithoutHeader();
366          index = locateNonRootIndexEntry(buffer, key, comparator);
367          if (index == -1) {
368            // This has to be changed
369            // For now change this to key value
370            throw new IOException("The key "
371                + CellUtil.getCellKeyAsString(key)
372                + " is before the" + " first key of the non-root index block " + block);
373          }
374
375          currentOffset = buffer.getLong();
376          currentOnDiskSize = buffer.getInt();
377
378          // Only update next indexed key if there is a next indexed key in the current level
379          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
380          if (nonRootIndexedKey != null) {
381            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
382            nextIndexedKey = tmpNextIndexKV;
383          }
384        } finally {
385          if (block != null && !block.getBlockType().isData()) {
386            // Release the block immediately if it is not the data block
387            block.release();
388          }
389        }
390      }
391
392      if (lookupLevel != searchTreeLevel) {
393        assert block.getBlockType().isData();
394        // Though we have retrieved a data block we have found an issue
395        // in the retrieved data block. Hence returned the block so that
396        // the ref count can be decremented
397        if (block != null) {
398          block.release();
399        }
400        throw new IOException("Reached a data block at level " + lookupLevel
401            + " but the number of levels is " + searchTreeLevel);
402      }
403
404      // set the next indexed key for the current block.
405      return new BlockWithScanInfo(block, nextIndexedKey);
406    }
407
408    @Override
409    public Cell midkey() throws IOException {
410      if (rootCount == 0)
411        throw new IOException("HFile empty");
412
413      Cell targetMidKey = this.midKey.get();
414      if (targetMidKey != null) {
415        return targetMidKey;
416      }
417
418      if (midLeafBlockOffset >= 0) {
419        if (cachingBlockReader == null) {
420          throw new IOException("Have to read the middle leaf block but " +
421              "no block reader available");
422        }
423
424        // Caching, using pread, assuming this is not a compaction.
425        HFileBlock midLeafBlock = cachingBlockReader.readBlock(
426            midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
427            BlockType.LEAF_INDEX, null);
428        try {
429          ByteBuff b = midLeafBlock.getBufferWithoutHeader();
430          int numDataBlocks = b.getIntAfterPosition(0);
431          int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
432          int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset
433              - SECONDARY_INDEX_ENTRY_OVERHEAD;
434          int keyOffset =
435              Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
436                  + SECONDARY_INDEX_ENTRY_OVERHEAD;
437          byte[] bytes = b.toBytes(keyOffset, keyLen);
438          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
439        } finally {
440          midLeafBlock.release();
441        }
442      } else {
443        // The middle of the root-level index.
444        targetMidKey = blockKeys[rootCount / 2];
445      }
446
447      this.midKey.set(targetMidKey);
448      return targetMidKey;
449    }
450
451    @Override
452    protected void initialize(int numEntries) {
453      blockKeys = new Cell[numEntries];
454    }
455
456    /**
457     * Adds a new entry in the root block index. Only used when reading.
458     *
459     * @param key Last key in the block
460     * @param offset file offset where the block is stored
461     * @param dataSize the uncompressed data size
462     */
463    @Override
464    protected void add(final byte[] key, final long offset, final int dataSize) {
465      blockOffsets[rootCount] = offset;
466      // Create the blockKeys as Cells once when the reader is opened
467      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
468      blockDataSizes[rootCount] = dataSize;
469      rootCount++;
470    }
471
472    @Override
473    public int rootBlockContainingKey(final byte[] key, int offset, int length,
474        CellComparator comp) {
475      // This should always be called with Cell not with a byte[] key
476      throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
477          "array. Only cell based keys can be searched for");
478    }
479
480    @Override
481    public int rootBlockContainingKey(Cell key) {
482      // Here the comparator should not be null as this happens for the root-level block
483      int pos = Bytes.binarySearch(blockKeys, key, comparator);
484      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
485      // binarySearch's javadoc.
486
487      if (pos >= 0) {
488        // This means this is an exact match with an element of blockKeys.
489        assert pos < blockKeys.length;
490        return pos;
491      }
492
493      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
494      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
495      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
496      // key < blockKeys[0], meaning the file does not contain the given key.
497
498      int i = -pos - 1;
499      assert 0 <= i && i <= blockKeys.length;
500      return i - 1;
501    }
502
503    @Override
504    public String toString() {
505      StringBuilder sb = new StringBuilder();
506      sb.append("size=" + rootCount).append("\n");
507      for (int i = 0; i < rootCount; i++) {
508        sb.append("key=").append((blockKeys[i]))
509            .append("\n  offset=").append(blockOffsets[i])
510            .append(", dataSize=" + blockDataSizes[i]).append("\n");
511      }
512      return sb.toString();
513    }
514  }
515   /**
516   * The reader will always hold the root level index in the memory. Index
517   * blocks at all other levels will be cached in the LRU cache in practice,
518   * although this API does not enforce that.
519   *
520   * <p>All non-root (leaf and intermediate) index blocks contain what we call a
521   * "secondary index": an array of offsets to the entries within the block.
522   * This allows us to do binary search for the entry corresponding to the
523   * given key without having to deserialize the block.
524   */
525   static abstract class BlockIndexReader implements HeapSize {
526
527    protected long[] blockOffsets;
528    protected int[] blockDataSizes;
529    protected int rootCount = 0;
530
531    // Mid-key metadata.
532    protected long midLeafBlockOffset = -1;
533    protected int midLeafBlockOnDiskSize = -1;
534    protected int midKeyEntry = -1;
535
536    /**
537     * The number of levels in the block index tree. One if there is only root
538     * level, two for root and leaf levels, etc.
539     */
540    protected int searchTreeLevel;
541
542    /** A way to read {@link HFile} blocks at a given offset */
543    protected CachingBlockReader cachingBlockReader;
544
545    /**
546     * @return true if the block index is empty.
547     */
548    public abstract boolean isEmpty();
549
550    /**
551     * Verifies that the block index is non-empty and throws an
552     * {@link IllegalStateException} otherwise.
553     */
554    public void ensureNonEmpty() {
555      if (isEmpty()) {
556        throw new IllegalStateException("Block index is empty or not loaded");
557      }
558    }
559
560    /**
561     * Return the data block which contains this key. This function will only
562     * be called when the HFile version is larger than 1.
563     *
564     * @param key the key we are looking for
565     * @param currentBlock the current block, to avoid re-reading the same block
566     * @param cacheBlocks
567     * @param pread
568     * @param isCompaction
569     * @param expectedDataBlockEncoding the data block encoding the caller is
570     *          expecting the data block to be in, or null to not perform this
571     *          check and return the block irrespective of the encoding
572     * @return reader a basic way to load blocks
573     * @throws IOException
574     */
575    public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
576        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
577        throws IOException {
578      BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
579        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
580      if (blockWithScanInfo == null) {
581        return null;
582      } else {
583        return blockWithScanInfo.getHFileBlock();
584      }
585    }
586
587    /**
588     * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
589     * other scan info such as the key that starts the next HFileBlock. This function will only
590     * be called when the HFile version is larger than 1.
591     *
592     * @param key the key we are looking for
593     * @param currentBlock the current block, to avoid re-reading the same block
594     * @param expectedDataBlockEncoding the data block encoding the caller is
595     *          expecting the data block to be in, or null to not perform this
596     *          check and return the block irrespective of the encoding.
597     * @return the BlockWithScanInfo which contains the DataBlock with other
598     *         scan info such as nextIndexedKey.
599     * @throws IOException
600     */
601    public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
602        boolean cacheBlocks, boolean pread, boolean isCompaction,
603        DataBlockEncoding expectedDataBlockEncoding) throws IOException;
604
605    /**
606     * An approximation to the {@link HFile}'s mid-key. Operates on block
607     * boundaries, and does not go inside blocks. In other words, returns the
608     * first key of the middle block of the file.
609     *
610     * @return the first key of the middle block
611     */
612    public abstract Cell midkey() throws IOException;
613
614    /**
615     * @param i from 0 to {@link #getRootBlockCount() - 1}
616     */
617    public long getRootBlockOffset(int i) {
618      return blockOffsets[i];
619    }
620
621    /**
622     * @param i zero-based index of a root-level block
623     * @return the on-disk size of the root-level block for version 2, or the
624     *         uncompressed size for version 1
625     */
626    public int getRootBlockDataSize(int i) {
627      return blockDataSizes[i];
628    }
629
630    /**
631     * @return the number of root-level blocks in this block index
632     */
633    public int getRootBlockCount() {
634      return rootCount;
635    }
636
637    /**
638     * Finds the root-level index block containing the given key.
639     *
640     * @param key
641     *          Key to find
642     * @param comp
643     *          the comparator to be used
644     * @return Offset of block containing <code>key</code> (between 0 and the
645     *         number of blocks - 1) or -1 if this file does not contain the
646     *         request.
647     */
648    // When we want to find the meta index block or bloom block for ROW bloom
649    // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
650    // CellComparator.
651    public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
652        CellComparator comp);
653
654    /**
655     * Finds the root-level index block containing the given key.
656     *
657     * @param key
658     *          Key to find
659     * @return Offset of block containing <code>key</code> (between 0 and the
660     *         number of blocks - 1) or -1 if this file does not contain the
661     *         request.
662     */
663    // When we want to find the meta index block or bloom block for ROW bloom
664    // type
665    // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
666    // need the CellComparator.
667    public int rootBlockContainingKey(final byte[] key, int offset, int length) {
668      return rootBlockContainingKey(key, offset, length, null);
669    }
670
671    /**
672     * Finds the root-level index block containing the given key.
673     *
674     * @param key
675     *          Key to find
676     */
677    public abstract int rootBlockContainingKey(final Cell key);
678
679    /**
680     * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
681     * @param nonRootIndex
682     * @param i the ith position
683     * @return The indexed key at the ith position in the nonRootIndex.
684     */
685    protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
686      int numEntries = nonRootIndex.getInt(0);
687      if (i < 0 || i >= numEntries) {
688        return null;
689      }
690
691      // Entries start after the number of entries and the secondary index.
692      // The secondary index takes numEntries + 1 ints.
693      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
694      // Targetkey's offset relative to the end of secondary index
695      int targetKeyRelOffset = nonRootIndex.getInt(
696          Bytes.SIZEOF_INT * (i + 1));
697
698      // The offset of the target key in the blockIndex buffer
699      int targetKeyOffset = entriesOffset     // Skip secondary index
700          + targetKeyRelOffset               // Skip all entries until mid
701          + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
702
703      // We subtract the two consecutive secondary index elements, which
704      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
705      // then need to subtract the overhead of offset and onDiskSize.
706      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
707        targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
708
709      // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
710      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
711    }
712
713    /**
714     * Performs a binary search over a non-root level index block. Utilizes the
715     * secondary index, which records the offsets of (offset, onDiskSize,
716     * firstKey) tuples of all entries.
717     *
718     * @param key
719     *          the key we are searching for offsets to individual entries in
720     *          the blockIndex buffer
721     * @param nonRootIndex
722     *          the non-root index block buffer, starting with the secondary
723     *          index. The position is ignored.
724     * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
725     *         keys[i + 1], if keys is the array of all keys being searched, or
726     *         -1 otherwise
727     * @throws IOException
728     */
729    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
730        CellComparator comparator) {
731
732      int numEntries = nonRootIndex.getIntAfterPosition(0);
733      int low = 0;
734      int high = numEntries - 1;
735      int mid = 0;
736
737      // Entries start after the number of entries and the secondary index.
738      // The secondary index takes numEntries + 1 ints.
739      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
740
741      // If we imagine that keys[-1] = -Infinity and
742      // keys[numEntries] = Infinity, then we are maintaining an invariant that
743      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
744      ByteBufferKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferKeyOnlyKeyValue();
745      ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<>();
746      while (low <= high) {
747        mid = low + ((high - low) >> 1);
748
749        // Midkey's offset relative to the end of secondary index
750        int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
751
752        // The offset of the middle key in the blockIndex buffer
753        int midKeyOffset = entriesOffset       // Skip secondary index
754            + midKeyRelOffset                  // Skip all entries until mid
755            + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
756
757        // We subtract the two consecutive secondary index elements, which
758        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
759        // then need to subtract the overhead of offset and onDiskSize.
760        int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
761            midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
762
763        // we have to compare in this order, because the comparator order
764        // has special logic when the 'left side' is a special key.
765        // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
766        // done after HBASE-12224 & HBASE-12282
767        // TODO avoid array call.
768        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
769        nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
770        int cmp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, nonRootIndexkeyOnlyKV);
771
772        // key lives above the midpoint
773        if (cmp > 0)
774          low = mid + 1; // Maintain the invariant that keys[low - 1] < key
775        // key lives below the midpoint
776        else if (cmp < 0)
777          high = mid - 1; // Maintain the invariant that key < keys[high + 1]
778        else
779          return mid; // exact match
780      }
781
782      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
783      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
784      // condition, low >= high + 1. Therefore, low = high + 1.
785
786      if (low != high + 1) {
787        throw new IllegalStateException("Binary search broken: low=" + low
788            + " " + "instead of " + (high + 1));
789      }
790
791      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
792      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
793      int i = low - 1;
794
795      // Some extra validation on the result.
796      if (i < -1 || i >= numEntries) {
797        throw new IllegalStateException("Binary search broken: result is " +
798            i + " but expected to be between -1 and (numEntries - 1) = " +
799            (numEntries - 1));
800      }
801
802      return i;
803    }
804
805    /**
806     * Search for one key using the secondary index in a non-root block. In case
807     * of success, positions the provided buffer at the entry of interest, where
808     * the file offset and the on-disk-size can be read.
809     *
810     * @param nonRootBlock
811     *          a non-root block without header. Initial position does not
812     *          matter.
813     * @param key
814     *          the byte array containing the key
815     * @return the index position where the given key was found, otherwise
816     *         return -1 in the case the given key is before the first key.
817     *
818     */
819    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
820        CellComparator comparator) {
821      int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
822
823      if (entryIndex != -1) {
824        int numEntries = nonRootBlock.getIntAfterPosition(0);
825
826        // The end of secondary index and the beginning of entries themselves.
827        int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
828
829        // The offset of the entry we are interested in relative to the end of
830        // the secondary index.
831        int entryRelOffset = nonRootBlock
832            .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
833
834        nonRootBlock.position(entriesOffset + entryRelOffset);
835      }
836
837      return entryIndex;
838    }
839
840    /**
841     * Read in the root-level index from the given input stream. Must match
842     * what was written into the root level by
843     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
844     * offset that function returned.
845     *
846     * @param in the buffered input stream or wrapped byte input stream
847     * @param numEntries the number of root-level index entries
848     * @throws IOException
849     */
850    public void readRootIndex(DataInput in, final int numEntries) throws IOException {
851      blockOffsets = new long[numEntries];
852      initialize(numEntries);
853      blockDataSizes = new int[numEntries];
854
855      // If index size is zero, no index was written.
856      if (numEntries > 0) {
857        for (int i = 0; i < numEntries; ++i) {
858          long offset = in.readLong();
859          int dataSize = in.readInt();
860          byte[] key = Bytes.readByteArray(in);
861          add(key, offset, dataSize);
862        }
863      }
864    }
865
866    protected abstract void initialize(int numEntries);
867
868    protected abstract void add(final byte[] key, final long offset, final int dataSize);
869
870    /**
871     * Read in the root-level index from the given input stream. Must match
872     * what was written into the root level by
873     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
874     * offset that function returned.
875     *
876     * @param blk the HFile block
877     * @param numEntries the number of root-level index entries
878     * @return the buffered input stream or wrapped byte input stream
879     * @throws IOException
880     */
881    public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
882      DataInputStream in = blk.getByteStream();
883      readRootIndex(in, numEntries);
884      return in;
885    }
886
887    /**
888     * Read the root-level metadata of a multi-level block index. Based on
889     * {@link #readRootIndex(DataInput, int)}, but also reads metadata
890     * necessary to compute the mid-key in a multi-level index.
891     *
892     * @param blk the HFile block
893     * @param numEntries the number of root-level index entries
894     * @throws IOException
895     */
896    public void readMultiLevelIndexRoot(HFileBlock blk,
897        final int numEntries) throws IOException {
898      DataInputStream in = readRootIndex(blk, numEntries);
899      // after reading the root index the checksum bytes have to
900      // be subtracted to know if the mid key exists.
901      int checkSumBytes = blk.totalChecksumBytes();
902      if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
903        // No mid-key metadata available.
904        return;
905      }
906      midLeafBlockOffset = in.readLong();
907      midLeafBlockOnDiskSize = in.readInt();
908      midKeyEntry = in.readInt();
909    }
910
911    @Override
912    public long heapSize() {
913      // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
914      long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
915          2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
916
917      // Mid-key metadata.
918      heapSize += MID_KEY_METADATA_SIZE;
919
920      heapSize = calculateHeapSizeForBlockKeys(heapSize);
921
922      if (blockOffsets != null) {
923        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
924            * Bytes.SIZEOF_LONG);
925      }
926
927      if (blockDataSizes != null) {
928        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
929            * Bytes.SIZEOF_INT);
930      }
931
932      return ClassSize.align(heapSize);
933    }
934
935    protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
936  }
937
938  /**
939   * Writes the block index into the output stream. Generate the tree from
940   * bottom up. The leaf level is written to disk as a sequence of inline
941   * blocks, if it is larger than a certain number of bytes. If the leaf level
942   * is not large enough, we write all entries to the root level instead.
943   *
944   * After all leaf blocks have been written, we end up with an index
945   * referencing the resulting leaf index blocks. If that index is larger than
946   * the allowed root index size, the writer will break it up into
947   * reasonable-size intermediate-level index block chunks write those chunks
948   * out, and create another index referencing those chunks. This will be
949   * repeated until the remaining index is small enough to become the root
950   * index. However, in most practical cases we will only have leaf-level
951   * blocks and the root index, or just the root index.
952   */
953  public static class BlockIndexWriter implements InlineBlockWriter {
954    /**
955     * While the index is being written, this represents the current block
956     * index referencing all leaf blocks, with one exception. If the file is
957     * being closed and there are not enough blocks to complete even a single
958     * leaf block, no leaf blocks get written and this contains the entire
959     * block index. After all levels of the index were written by
960     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
961     * root-level index.
962     */
963    private BlockIndexChunk rootChunk = new BlockIndexChunk();
964
965    /**
966     * Current leaf-level chunk. New entries referencing data blocks get added
967     * to this chunk until it grows large enough to be written to disk.
968     */
969    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
970
971    /**
972     * The number of block index levels. This is one if there is only root
973     * level (even empty), two if there a leaf level and root level, and is
974     * higher if there are intermediate levels. This is only final after
975     * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
976     * initial value accounts for the root level, and will be increased to two
977     * as soon as we find out there is a leaf-level in
978     * {@link #blockWritten(long, int, int)}.
979     */
980    private int numLevels = 1;
981
982    private HFileBlock.Writer blockWriter;
983    private byte[] firstKey = null;
984
985    /**
986     * The total number of leaf-level entries, i.e. entries referenced by
987     * leaf-level blocks. For the data block index this is equal to the number
988     * of data blocks.
989     */
990    private long totalNumEntries;
991
992    /** Total compressed size of all index blocks. */
993    private long totalBlockOnDiskSize;
994
995    /** Total uncompressed size of all index blocks. */
996    private long totalBlockUncompressedSize;
997
998    /** The maximum size guideline of all multi-level index blocks. */
999    private int maxChunkSize;
1000
1001    /** The maximum level of multi-level index blocks */
1002    private int minIndexNumEntries;
1003
1004    /** Whether we require this block index to always be single-level. */
1005    private boolean singleLevelOnly;
1006
1007    /** CacheConfig, or null if cache-on-write is disabled */
1008    private CacheConfig cacheConf;
1009
1010    /** Name to use for computing cache keys */
1011    private String nameForCaching;
1012
1013    /** Creates a single-level block index writer */
1014    public BlockIndexWriter() {
1015      this(null, null, null);
1016      singleLevelOnly = true;
1017    }
1018
1019    /**
1020     * Creates a multi-level block index writer.
1021     *
1022     * @param blockWriter the block writer to use to write index blocks
1023     * @param cacheConf used to determine when and how a block should be cached-on-write.
1024     */
1025    public BlockIndexWriter(HFileBlock.Writer blockWriter,
1026        CacheConfig cacheConf, String nameForCaching) {
1027      if ((cacheConf == null) != (nameForCaching == null)) {
1028        throw new IllegalArgumentException("Block cache and file name for " +
1029            "caching must be both specified or both null");
1030      }
1031
1032      this.blockWriter = blockWriter;
1033      this.cacheConf = cacheConf;
1034      this.nameForCaching = nameForCaching;
1035      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1036      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1037    }
1038
1039    public void setMaxChunkSize(int maxChunkSize) {
1040      if (maxChunkSize <= 0) {
1041        throw new IllegalArgumentException("Invalid maximum index block size");
1042      }
1043      this.maxChunkSize = maxChunkSize;
1044    }
1045
1046    public void setMinIndexNumEntries(int minIndexNumEntries) {
1047      if (minIndexNumEntries <= 1) {
1048        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1049      }
1050      this.minIndexNumEntries = minIndexNumEntries;
1051    }
1052
1053    /**
1054     * Writes the root level and intermediate levels of the block index into
1055     * the output stream, generating the tree from bottom up. Assumes that the
1056     * leaf level has been inline-written to the disk if there is enough data
1057     * for more than one leaf block. We iterate by breaking the current level
1058     * of the block index, starting with the index of all leaf-level blocks,
1059     * into chunks small enough to be written to disk, and generate its parent
1060     * level, until we end up with a level small enough to become the root
1061     * level.
1062     *
1063     * If the leaf level is not large enough, there is no inline block index
1064     * anymore, so we only write that level of block index to disk as the root
1065     * level.
1066     *
1067     * @param out FSDataOutputStream
1068     * @return position at which we entered the root-level index.
1069     * @throws IOException
1070     */
1071    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1072      if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1073        throw new IOException("Trying to write a multi-level block index, " +
1074            "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1075            "last inline chunk.");
1076      }
1077
1078      // We need to get mid-key metadata before we create intermediate
1079      // indexes and overwrite the root chunk.
1080      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1081          : null;
1082
1083      if (curInlineChunk != null) {
1084        while (rootChunk.getRootSize() > maxChunkSize
1085            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1086            && rootChunk.getNumEntries() > minIndexNumEntries
1087            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1088            && numLevels < 16) {
1089          rootChunk = writeIntermediateLevel(out, rootChunk);
1090          numLevels += 1;
1091        }
1092      }
1093
1094      // write the root level
1095      long rootLevelIndexPos = out.getPos();
1096
1097      {
1098        DataOutput blockStream =
1099            blockWriter.startWriting(BlockType.ROOT_INDEX);
1100        rootChunk.writeRoot(blockStream);
1101        if (midKeyMetadata != null)
1102          blockStream.write(midKeyMetadata);
1103        blockWriter.writeHeaderAndData(out);
1104        if (cacheConf != null) {
1105          cacheConf.getBlockCache().ifPresent(cache -> {
1106            HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1107            cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
1108                blockForCaching.getBlockType()), blockForCaching);
1109          });
1110        }
1111      }
1112
1113      // Add root index block size
1114      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1115      totalBlockUncompressedSize +=
1116          blockWriter.getUncompressedSizeWithoutHeader();
1117
1118      if (LOG.isTraceEnabled()) {
1119        LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1120          + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1121          + " root-level entries, " + totalNumEntries + " total entries, "
1122          + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1123          " on-disk size, "
1124          + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1125          " total uncompressed size.");
1126      }
1127      return rootLevelIndexPos;
1128    }
1129
1130    /**
1131     * Writes the block index data as a single level only. Does not do any
1132     * block framing.
1133     *
1134     * @param out the buffered output stream to write the index to. Typically a
1135     *          stream writing into an {@link HFile} block.
1136     * @param description a short description of the index being written. Used
1137     *          in a log message.
1138     * @throws IOException
1139     */
1140    public void writeSingleLevelIndex(DataOutput out, String description)
1141        throws IOException {
1142      expectNumLevels(1);
1143
1144      if (!singleLevelOnly)
1145        throw new IOException("Single-level mode is turned off");
1146
1147      if (rootChunk.getNumEntries() > 0)
1148        throw new IOException("Root-level entries already added in " +
1149            "single-level mode");
1150
1151      rootChunk = curInlineChunk;
1152      curInlineChunk = new BlockIndexChunk();
1153
1154      if (LOG.isTraceEnabled()) {
1155        LOG.trace("Wrote a single-level " + description + " index with "
1156          + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1157          + " bytes");
1158      }
1159      rootChunk.writeRoot(out);
1160    }
1161
1162    /**
1163     * Split the current level of the block index into intermediate index
1164     * blocks of permitted size and write those blocks to disk. Return the next
1165     * level of the block index referencing those intermediate-level blocks.
1166     *
1167     * @param out
1168     * @param currentLevel the current level of the block index, such as the a
1169     *          chunk referencing all leaf-level index blocks
1170     * @return the parent level block index, which becomes the root index after
1171     *         a few (usually zero) iterations
1172     * @throws IOException
1173     */
1174    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1175        BlockIndexChunk currentLevel) throws IOException {
1176      // Entries referencing intermediate-level blocks we are about to create.
1177      BlockIndexChunk parent = new BlockIndexChunk();
1178
1179      // The current intermediate-level block index chunk.
1180      BlockIndexChunk curChunk = new BlockIndexChunk();
1181
1182      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1183        curChunk.add(currentLevel.getBlockKey(i),
1184            currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1185
1186        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1187        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1188        // first key is larger than maxChunkSize this will cause infinite recursion.
1189        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1190          writeIntermediateBlock(out, parent, curChunk);
1191        }
1192      }
1193
1194      if (curChunk.getNumEntries() > 0) {
1195        writeIntermediateBlock(out, parent, curChunk);
1196      }
1197
1198      return parent;
1199    }
1200
1201    private void writeIntermediateBlock(FSDataOutputStream out,
1202        BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1203      long beginOffset = out.getPos();
1204      DataOutputStream dos = blockWriter.startWriting(
1205          BlockType.INTERMEDIATE_INDEX);
1206      curChunk.writeNonRoot(dos);
1207      byte[] curFirstKey = curChunk.getBlockKey(0);
1208      blockWriter.writeHeaderAndData(out);
1209
1210      if (getCacheOnWrite()) {
1211        cacheConf.getBlockCache().ifPresent(cache -> {
1212          HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1213          cache.cacheBlock(
1214              new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
1215              blockForCaching);
1216        });
1217      }
1218
1219      // Add intermediate index block size
1220      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1221      totalBlockUncompressedSize +=
1222          blockWriter.getUncompressedSizeWithoutHeader();
1223
1224      // OFFSET is the beginning offset the chunk of block index entries.
1225      // SIZE is the total byte size of the chunk of block index entries
1226      // + the secondary index size
1227      // FIRST_KEY is the first key in the chunk of block index
1228      // entries.
1229      parent.add(curFirstKey, beginOffset,
1230          blockWriter.getOnDiskSizeWithHeader());
1231
1232      // clear current block index chunk
1233      curChunk.clear();
1234      curFirstKey = null;
1235    }
1236
1237    /**
1238     * @return how many block index entries there are in the root level
1239     */
1240    public final int getNumRootEntries() {
1241      return rootChunk.getNumEntries();
1242    }
1243
1244    /**
1245     * @return the number of levels in this block index.
1246     */
1247    public int getNumLevels() {
1248      return numLevels;
1249    }
1250
1251    private void expectNumLevels(int expectedNumLevels) {
1252      if (numLevels != expectedNumLevels) {
1253        throw new IllegalStateException("Number of block index levels is "
1254            + numLevels + "but is expected to be " + expectedNumLevels);
1255      }
1256    }
1257
1258    /**
1259     * Whether there is an inline block ready to be written. In general, we
1260     * write an leaf-level index block as an inline block as soon as its size
1261     * as serialized in the non-root format reaches a certain threshold.
1262     */
1263    @Override
1264    public boolean shouldWriteBlock(boolean closing) {
1265      if (singleLevelOnly) {
1266        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1267      }
1268
1269      if (curInlineChunk == null) {
1270        throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1271            "called with closing=true and then called again?");
1272      }
1273
1274      if (curInlineChunk.getNumEntries() == 0) {
1275        return false;
1276      }
1277
1278      // We do have some entries in the current inline chunk.
1279      if (closing) {
1280        if (rootChunk.getNumEntries() == 0) {
1281          // We did not add any leaf-level blocks yet. Instead of creating a
1282          // leaf level with one block, move these entries to the root level.
1283
1284          expectNumLevels(1);
1285          rootChunk = curInlineChunk;
1286          curInlineChunk = null;  // Disallow adding any more index entries.
1287          return false;
1288        }
1289
1290        return true;
1291      } else {
1292        return curInlineChunk.getNonRootSize() >= maxChunkSize;
1293      }
1294    }
1295
1296    /**
1297     * Write out the current inline index block. Inline blocks are non-root
1298     * blocks, so the non-root index format is used.
1299     *
1300     * @param out
1301     */
1302    @Override
1303    public void writeInlineBlock(DataOutput out) throws IOException {
1304      if (singleLevelOnly)
1305        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1306
1307      // Write the inline block index to the output stream in the non-root
1308      // index block format.
1309      curInlineChunk.writeNonRoot(out);
1310
1311      // Save the first key of the inline block so that we can add it to the
1312      // parent-level index.
1313      firstKey = curInlineChunk.getBlockKey(0);
1314
1315      // Start a new inline index block
1316      curInlineChunk.clear();
1317    }
1318
1319    /**
1320     * Called after an inline block has been written so that we can add an
1321     * entry referring to that block to the parent-level index.
1322     */
1323    @Override
1324    public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1325      // Add leaf index block size
1326      totalBlockOnDiskSize += onDiskSize;
1327      totalBlockUncompressedSize += uncompressedSize;
1328
1329      if (singleLevelOnly)
1330        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1331
1332      if (firstKey == null) {
1333        throw new IllegalStateException("Trying to add second-level index " +
1334            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1335            "but the first key was not set in writeInlineBlock");
1336      }
1337
1338      if (rootChunk.getNumEntries() == 0) {
1339        // We are writing the first leaf block, so increase index level.
1340        expectNumLevels(1);
1341        numLevels = 2;
1342      }
1343
1344      // Add another entry to the second-level index. Include the number of
1345      // entries in all previous leaf-level chunks for mid-key calculation.
1346      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1347      firstKey = null;
1348    }
1349
1350    @Override
1351    public BlockType getInlineBlockType() {
1352      return BlockType.LEAF_INDEX;
1353    }
1354
1355    /**
1356     * Add one index entry to the current leaf-level block. When the leaf-level
1357     * block gets large enough, it will be flushed to disk as an inline block.
1358     *
1359     * @param firstKey the first key of the data block
1360     * @param blockOffset the offset of the data block
1361     * @param blockDataSize the on-disk size of the data block ({@link HFile}
1362     *          format version 2), or the uncompressed size of the data block (
1363     *          {@link HFile} format version 1).
1364     */
1365    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1366      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1367      ++totalNumEntries;
1368    }
1369
1370    /**
1371     * @throws IOException if we happened to write a multi-level index.
1372     */
1373    public void ensureSingleLevel() throws IOException {
1374      if (numLevels > 1) {
1375        throw new IOException ("Wrote a " + numLevels + "-level index with " +
1376            rootChunk.getNumEntries() + " root-level entries, but " +
1377            "this is expected to be a single-level block index.");
1378      }
1379    }
1380
1381    /**
1382     * @return true if we are using cache-on-write. This is configured by the
1383     *         caller of the constructor by either passing a valid block cache
1384     *         or null.
1385     */
1386    @Override
1387    public boolean getCacheOnWrite() {
1388      return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1389    }
1390
1391    /**
1392     * The total uncompressed size of the root index block, intermediate-level
1393     * index blocks, and leaf-level index blocks.
1394     *
1395     * @return the total uncompressed size of all index blocks
1396     */
1397    public long getTotalUncompressedSize() {
1398      return totalBlockUncompressedSize;
1399    }
1400
1401  }
1402
1403  /**
1404   * A single chunk of the block index in the process of writing. The data in
1405   * this chunk can become a leaf-level, intermediate-level, or root index
1406   * block.
1407   */
1408  static class BlockIndexChunk {
1409
1410    /** First keys of the key range corresponding to each index entry. */
1411    private final List<byte[]> blockKeys = new ArrayList<>();
1412
1413    /** Block offset in backing stream. */
1414    private final List<Long> blockOffsets = new ArrayList<>();
1415
1416    /** On-disk data sizes of lower-level data or index blocks. */
1417    private final List<Integer> onDiskDataSizes = new ArrayList<>();
1418
1419    /**
1420     * The cumulative number of sub-entries, i.e. entries on deeper-level block
1421     * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1422     * blocks corresponding to this chunk's entries #0 through #i inclusively.
1423     */
1424    private final List<Long> numSubEntriesAt = new ArrayList<>();
1425
1426    /**
1427     * The offset of the next entry to be added, relative to the end of the
1428     * "secondary index" in the "non-root" format representation of this index
1429     * chunk. This is the next value to be added to the secondary index.
1430     */
1431    private int curTotalNonRootEntrySize = 0;
1432
1433    /**
1434     * The accumulated size of this chunk if stored in the root index format.
1435     */
1436    private int curTotalRootSize = 0;
1437
1438    /**
1439     * The "secondary index" used for binary search over variable-length
1440     * records in a "non-root" format block. These offsets are relative to the
1441     * end of this secondary index.
1442     */
1443    private final List<Integer> secondaryIndexOffsetMarks = new ArrayList<>();
1444
1445    /**
1446     * Adds a new entry to this block index chunk.
1447     *
1448     * @param firstKey the first key in the block pointed to by this entry
1449     * @param blockOffset the offset of the next-level block pointed to by this
1450     *          entry
1451     * @param onDiskDataSize the on-disk data of the block pointed to by this
1452     *          entry, including header size
1453     * @param curTotalNumSubEntries if this chunk is the root index chunk under
1454     *          construction, this specifies the current total number of
1455     *          sub-entries in all leaf-level chunks, including the one
1456     *          corresponding to the second-level entry being added.
1457     */
1458    void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1459        long curTotalNumSubEntries) {
1460      // Record the offset for the secondary index
1461      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1462      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1463          + firstKey.length;
1464
1465      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1466          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1467
1468      blockKeys.add(firstKey);
1469      blockOffsets.add(blockOffset);
1470      onDiskDataSizes.add(onDiskDataSize);
1471
1472      if (curTotalNumSubEntries != -1) {
1473        numSubEntriesAt.add(curTotalNumSubEntries);
1474
1475        // Make sure the parallel arrays are in sync.
1476        if (numSubEntriesAt.size() != blockKeys.size()) {
1477          throw new IllegalStateException("Only have key/value count " +
1478              "stats for " + numSubEntriesAt.size() + " block index " +
1479              "entries out of " + blockKeys.size());
1480        }
1481      }
1482    }
1483
1484    /**
1485     * The same as {@link #add(byte[], long, int, long)} but does not take the
1486     * key/value into account. Used for single-level indexes.
1487     *
1488     * @see #add(byte[], long, int, long)
1489     */
1490    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1491      add(firstKey, blockOffset, onDiskDataSize, -1);
1492    }
1493
1494    public void clear() {
1495      blockKeys.clear();
1496      blockOffsets.clear();
1497      onDiskDataSizes.clear();
1498      secondaryIndexOffsetMarks.clear();
1499      numSubEntriesAt.clear();
1500      curTotalNonRootEntrySize = 0;
1501      curTotalRootSize = 0;
1502    }
1503
1504    /**
1505     * Finds the entry corresponding to the deeper-level index block containing
1506     * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1507     * ordering of sub-entries.
1508     *
1509     * <p>
1510     * <i> Implementation note. </i> We are looking for i such that
1511     * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1512     * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1513     * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1514     * sub-entries. i is by definition the insertion point of k in
1515     * numSubEntriesAt.
1516     *
1517     * @param k sub-entry index, from 0 to the total number sub-entries - 1
1518     * @return the 0-based index of the entry corresponding to the given
1519     *         sub-entry
1520     */
1521    public int getEntryBySubEntry(long k) {
1522      // We define mid-key as the key corresponding to k'th sub-entry
1523      // (0-based).
1524
1525      int i = Collections.binarySearch(numSubEntriesAt, k);
1526
1527      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1528      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1529      // is in the (i + 1)'th chunk.
1530      if (i >= 0)
1531        return i + 1;
1532
1533      // Inexact match. Return the insertion point.
1534      return -i - 1;
1535    }
1536
1537    /**
1538     * Used when writing the root block index of a multi-level block index.
1539     * Serializes additional information allowing to efficiently identify the
1540     * mid-key.
1541     *
1542     * @return a few serialized fields for finding the mid-key
1543     * @throws IOException if could not create metadata for computing mid-key
1544     */
1545    public byte[] getMidKeyMetadata() throws IOException {
1546      ByteArrayOutputStream baos = new ByteArrayOutputStream(
1547          MID_KEY_METADATA_SIZE);
1548      DataOutputStream baosDos = new DataOutputStream(baos);
1549      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1550      if (totalNumSubEntries == 0) {
1551        throw new IOException("No leaf-level entries, mid-key unavailable");
1552      }
1553      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1554      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1555
1556      baosDos.writeLong(blockOffsets.get(midKeyEntry));
1557      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1558
1559      long numSubEntriesBefore = midKeyEntry > 0
1560          ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1561      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1562      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1563      {
1564        throw new IOException("Could not identify mid-key index within the "
1565            + "leaf-level block containing mid-key: out of range ("
1566            + subEntryWithinEntry + ", numSubEntriesBefore="
1567            + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1568            + ")");
1569      }
1570
1571      baosDos.writeInt((int) subEntryWithinEntry);
1572
1573      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1574        throw new IOException("Could not write mid-key metadata: size=" +
1575            baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1576      }
1577
1578      // Close just to be good citizens, although this has no effect.
1579      baos.close();
1580
1581      return baos.toByteArray();
1582    }
1583
1584    /**
1585     * Writes the block index chunk in the non-root index block format. This
1586     * format contains the number of entries, an index of integer offsets
1587     * for quick binary search on variable-length records, and tuples of
1588     * block offset, on-disk block size, and the first key for each entry.
1589     *
1590     * @param out
1591     * @throws IOException
1592     */
1593    void writeNonRoot(DataOutput out) throws IOException {
1594      // The number of entries in the block.
1595      out.writeInt(blockKeys.size());
1596
1597      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1598        throw new IOException("Corrupted block index chunk writer: " +
1599            blockKeys.size() + " entries but " +
1600            secondaryIndexOffsetMarks.size() + " secondary index items");
1601      }
1602
1603      // For each entry, write a "secondary index" of relative offsets to the
1604      // entries from the end of the secondary index. This works, because at
1605      // read time we read the number of entries and know where the secondary
1606      // index ends.
1607      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1608        out.writeInt(currentSecondaryIndex);
1609
1610      // We include one other element in the secondary index to calculate the
1611      // size of each entry more easily by subtracting secondary index elements.
1612      out.writeInt(curTotalNonRootEntrySize);
1613
1614      for (int i = 0; i < blockKeys.size(); ++i) {
1615        out.writeLong(blockOffsets.get(i));
1616        out.writeInt(onDiskDataSizes.get(i));
1617        out.write(blockKeys.get(i));
1618      }
1619    }
1620
1621    /**
1622     * @return the size of this chunk if stored in the non-root index block
1623     *         format
1624     */
1625    int getNonRootSize() {
1626      return Bytes.SIZEOF_INT                          // Number of entries
1627          + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1628          + curTotalNonRootEntrySize;                  // All entries
1629    }
1630
1631    /**
1632     * Writes this chunk into the given output stream in the root block index
1633     * format. This format is similar to the {@link HFile} version 1 block
1634     * index format, except that we store on-disk size of the block instead of
1635     * its uncompressed size.
1636     *
1637     * @param out the data output stream to write the block index to. Typically
1638     *          a stream writing into an {@link HFile} block.
1639     * @throws IOException
1640     */
1641    void writeRoot(DataOutput out) throws IOException {
1642      for (int i = 0; i < blockKeys.size(); ++i) {
1643        out.writeLong(blockOffsets.get(i));
1644        out.writeInt(onDiskDataSizes.get(i));
1645        Bytes.writeByteArray(out, blockKeys.get(i));
1646      }
1647    }
1648
1649    /**
1650     * @return the size of this chunk if stored in the root index block format
1651     */
1652    int getRootSize() {
1653      return curTotalRootSize;
1654    }
1655
1656    /**
1657     * @return the number of entries in this block index chunk
1658     */
1659    public int getNumEntries() {
1660      return blockKeys.size();
1661    }
1662
1663    public byte[] getBlockKey(int i) {
1664      return blockKeys.get(i);
1665    }
1666
1667    public long getBlockOffset(int i) {
1668      return blockOffsets.get(i);
1669    }
1670
1671    public int getOnDiskDataSize(int i) {
1672      return onDiskDataSizes.get(i);
1673    }
1674
1675    public long getCumulativeNumKV(int i) {
1676      if (i < 0)
1677        return 0;
1678      return numSubEntriesAt.get(i);
1679    }
1680
1681  }
1682
1683  public static int getMaxChunkSize(Configuration conf) {
1684    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1685  }
1686
1687  public static int getMinIndexNumEntries(Configuration conf) {
1688    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1689  }
1690}