001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.DataOutput;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.List;
031import java.util.concurrent.atomic.AtomicReference;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038//import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.io.HeapSize;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.ClassSize;
053import org.apache.hadoop.hbase.util.ObjectIntPair;
054import org.apache.hadoop.io.WritableUtils;
055import org.apache.hadoop.util.StringUtils;
056
057/**
058 * Provides functionality to write ({@link BlockIndexWriter}) and read
059 * BlockIndexReader
060 * single-level and multi-level block indexes.
061 *
062 * Examples of how to use the block index writer can be found in
063 * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
064 *  {@link HFileWriterImpl}. Examples of how to use the reader can be
065 *  found in {@link HFileReaderImpl} and
066 *  org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex.
067 */
068@InterfaceAudience.Private
069public class HFileBlockIndex {
070
071  private static final Logger LOG = LoggerFactory.getLogger(HFileBlockIndex.class);
072
073  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
074
075  /**
076   * The maximum size guideline for index blocks (both leaf, intermediate, and
077   * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
078   */
079  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
080
081  /**
082   * Minimum number of entries in a single index block. Even if we are above the
083   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
084   * entries. We should have at least a few entries so that we don't have too many levels in the
085   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
086   */
087  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
088
089  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
090
091  /**
092   * The number of bytes stored in each "secondary index" entry in addition to
093   * key bytes in the non-root index block format. The first long is the file
094   * offset of the deeper-level block the entry points to, and the int that
095   * follows is that block's on-disk size without including header.
096   */
097  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
098      + Bytes.SIZEOF_LONG;
099
100  /**
101   * Error message when trying to use inline block API in single-level mode.
102   */
103  private static final String INLINE_BLOCKS_NOT_ALLOWED =
104      "Inline blocks are not allowed in the single-level-only mode";
105
106  /**
107   * The size of a meta-data record used for finding the mid-key in a
108   * multi-level index. Consists of the middle leaf-level index block offset
109   * (long), its on-disk size without header included (int), and the mid-key
110   * entry's zero-based index in that leaf index block.
111   */
112  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
113      2 * Bytes.SIZEOF_INT;
114
115  /**
116   * An implementation of the BlockIndexReader that deals with block keys which are plain
117   * byte[] like MetaBlock or the Bloom Block for ROW bloom.
118   * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
119   */
120   static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
121
122    private byte[][] blockKeys;
123
124    public ByteArrayKeyBlockIndexReader(final int treeLevel) {
125      // Can be null for METAINDEX block
126      searchTreeLevel = treeLevel;
127    }
128
129    @Override
130    protected long calculateHeapSizeForBlockKeys(long heapSize) {
131      // Calculating the size of blockKeys
132      if (blockKeys != null) {
133        heapSize += ClassSize.REFERENCE;
134        // Adding array + references overhead
135        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
136
137        // Adding bytes
138        for (byte[] key : blockKeys) {
139          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
140        }
141      }
142      return heapSize;
143    }
144
145    @Override
146    public boolean isEmpty() {
147      return blockKeys.length == 0;
148    }
149
150    /**
151     * @param i
152     *          from 0 to {@link #getRootBlockCount() - 1}
153     */
154    public byte[] getRootBlockKey(int i) {
155      return blockKeys[i];
156    }
157
158    @Override
159    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
160        boolean cacheBlocks, boolean pread, boolean isCompaction,
161        DataBlockEncoding expectedDataBlockEncoding,
162        CachingBlockReader cachingBlockReader) throws IOException {
163      // this would not be needed
164      return null;
165    }
166
167    @Override
168    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
169      // Not needed here
170      return null;
171    }
172
173    @Override
174    protected void initialize(int numEntries) {
175      blockKeys = new byte[numEntries][];
176    }
177
178    @Override
179    protected void add(final byte[] key, final long offset, final int dataSize) {
180      blockOffsets[rootCount] = offset;
181      blockKeys[rootCount] = key;
182      blockDataSizes[rootCount] = dataSize;
183      rootCount++;
184    }
185
186    @Override
187    public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
188      int pos = Bytes.binarySearch(blockKeys, key, offset, length);
189      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
190      // binarySearch's javadoc.
191
192      if (pos >= 0) {
193        // This means this is an exact match with an element of blockKeys.
194        assert pos < blockKeys.length;
195        return pos;
196      }
197
198      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
199      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
200      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
201      // key < blockKeys[0], meaning the file does not contain the given key.
202
203      int i = -pos - 1;
204      assert 0 <= i && i <= blockKeys.length;
205      return i - 1;
206    }
207
208    @Override
209    public int rootBlockContainingKey(Cell key) {
210      // Should not be called on this because here it deals only with byte[]
211      throw new UnsupportedOperationException(
212          "Cannot search for a key that is of Cell type. Only plain byte array keys " +
213          "can be searched for");
214    }
215
216    @Override
217    public String toString() {
218      StringBuilder sb = new StringBuilder();
219      sb.append("size=" + rootCount).append("\n");
220      for (int i = 0; i < rootCount; i++) {
221        sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
222            .append("\n  offset=").append(blockOffsets[i])
223            .append(", dataSize=" + blockDataSizes[i]).append("\n");
224      }
225      return sb.toString();
226    }
227  }
228
229  /**
230   * An implementation of the BlockIndexReader that deals with block keys which are the key
231   * part of a cell like the Data block index or the ROW_COL bloom blocks
232   * This needs a comparator to work with the Cells
233   */
234  static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
235
236    private Cell[] blockKeys;
237    /** Pre-computed mid-key */
238    private AtomicReference<Cell> midKey = new AtomicReference<>();
239    /** Needed doing lookup on blocks. */
240    private CellComparator comparator;
241
242    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
243      // Can be null for METAINDEX block
244      comparator = c;
245      searchTreeLevel = treeLevel;
246    }
247
248    @Override
249    protected long calculateHeapSizeForBlockKeys(long heapSize) {
250      if (blockKeys != null) {
251        heapSize += ClassSize.REFERENCE;
252        // Adding array + references overhead
253        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
254
255        // Adding blockKeys
256        for (Cell key : blockKeys) {
257          heapSize += ClassSize.align(key.heapSize());
258        }
259      }
260      // Add comparator and the midkey atomicreference
261      heapSize += 2 * ClassSize.REFERENCE;
262      return heapSize;
263    }
264
265    @Override
266    public boolean isEmpty() {
267      return blockKeys.length == 0;
268    }
269
270    /**
271     * @param i
272     *          from 0 to {@link #getRootBlockCount() - 1}
273     */
274    public Cell getRootBlockKey(int i) {
275      return blockKeys[i];
276    }
277
278    @Override
279    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
280        boolean cacheBlocks, boolean pread, boolean isCompaction,
281        DataBlockEncoding expectedDataBlockEncoding,
282        CachingBlockReader cachingBlockReader) throws IOException {
283      int rootLevelIndex = rootBlockContainingKey(key);
284      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
285        return null;
286      }
287
288      // the next indexed key
289      Cell nextIndexedKey = null;
290
291      // Read the next-level (intermediate or leaf) index block.
292      long currentOffset = blockOffsets[rootLevelIndex];
293      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
294
295      if (rootLevelIndex < blockKeys.length - 1) {
296        nextIndexedKey = blockKeys[rootLevelIndex + 1];
297      } else {
298        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
299      }
300
301      int lookupLevel = 1; // How many levels deep we are in our lookup.
302      int index = -1;
303
304      HFileBlock block = null;
305      KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
306      while (true) {
307        try {
308          // Must initialize it with null here, because if don't and once an exception happen in
309          // readBlock, then we'll release the previous assigned block twice in the finally block.
310          // (See HBASE-22422)
311          block = null;
312          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
313            // Avoid reading the same block again, even with caching turned off.
314            // This is crucial for compaction-type workload which might have
315            // caching turned off. This is like a one-block cache inside the
316            // scanner.
317            block = currentBlock;
318          } else {
319            // Call HFile's caching block reader API. We always cache index
320            // blocks, otherwise we might get terrible performance.
321            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
322            BlockType expectedBlockType;
323            if (lookupLevel < searchTreeLevel - 1) {
324              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
325            } else if (lookupLevel == searchTreeLevel - 1) {
326              expectedBlockType = BlockType.LEAF_INDEX;
327            } else {
328              // this also accounts for ENCODED_DATA
329              expectedBlockType = BlockType.DATA;
330            }
331            block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
332              pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
333          }
334
335          if (block == null) {
336            throw new IOException("Failed to read block at offset " + currentOffset
337                + ", onDiskSize=" + currentOnDiskSize);
338          }
339
340          // Found a data block, break the loop and check our level in the tree.
341          if (block.getBlockType().isData()) {
342            break;
343          }
344
345          // Not a data block. This must be a leaf-level or intermediate-level
346          // index block. We don't allow going deeper than searchTreeLevel.
347          if (++lookupLevel > searchTreeLevel) {
348            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
349                + ", searchTreeLevel=" + searchTreeLevel);
350          }
351
352          // Locate the entry corresponding to the given key in the non-root
353          // (leaf or intermediate-level) index block.
354          ByteBuff buffer = block.getBufferWithoutHeader();
355          index = locateNonRootIndexEntry(buffer, key, comparator);
356          if (index == -1) {
357            // This has to be changed
358            // For now change this to key value
359            throw new IOException("The key "
360                + CellUtil.getCellKeyAsString(key)
361                + " is before the" + " first key of the non-root index block " + block);
362          }
363
364          currentOffset = buffer.getLong();
365          currentOnDiskSize = buffer.getInt();
366
367          // Only update next indexed key if there is a next indexed key in the current level
368          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
369          if (nonRootIndexedKey != null) {
370            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
371            nextIndexedKey = tmpNextIndexKV;
372          }
373        } finally {
374          if (block != null && !block.getBlockType().isData()) {
375            // Release the block immediately if it is not the data block
376            block.release();
377          }
378        }
379      }
380
381      if (lookupLevel != searchTreeLevel) {
382        assert block.getBlockType().isData();
383        // Though we have retrieved a data block we have found an issue
384        // in the retrieved data block. Hence returned the block so that
385        // the ref count can be decremented
386        if (block != null) {
387          block.release();
388        }
389        throw new IOException("Reached a data block at level " + lookupLevel
390            + " but the number of levels is " + searchTreeLevel);
391      }
392
393      // set the next indexed key for the current block.
394      return new BlockWithScanInfo(block, nextIndexedKey);
395    }
396
397    @Override
398    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
399      if (rootCount == 0)
400        throw new IOException("HFile empty");
401
402      Cell targetMidKey = this.midKey.get();
403      if (targetMidKey != null) {
404        return targetMidKey;
405      }
406
407      if (midLeafBlockOffset >= 0) {
408        if (cachingBlockReader == null) {
409          throw new IOException("Have to read the middle leaf block but " +
410              "no block reader available");
411        }
412
413        // Caching, using pread, assuming this is not a compaction.
414        HFileBlock midLeafBlock = cachingBlockReader.readBlock(
415            midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
416            BlockType.LEAF_INDEX, null);
417        try {
418          ByteBuff b = midLeafBlock.getBufferWithoutHeader();
419          int numDataBlocks = b.getIntAfterPosition(0);
420          int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
421          int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset
422              - SECONDARY_INDEX_ENTRY_OVERHEAD;
423          int keyOffset =
424              Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
425                  + SECONDARY_INDEX_ENTRY_OVERHEAD;
426          byte[] bytes = b.toBytes(keyOffset, keyLen);
427          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
428        } finally {
429          midLeafBlock.release();
430        }
431      } else {
432        // The middle of the root-level index.
433        targetMidKey = blockKeys[rootCount / 2];
434      }
435
436      this.midKey.set(targetMidKey);
437      return targetMidKey;
438    }
439
440    @Override
441    protected void initialize(int numEntries) {
442      blockKeys = new Cell[numEntries];
443    }
444
445    /**
446     * Adds a new entry in the root block index. Only used when reading.
447     *
448     * @param key Last key in the block
449     * @param offset file offset where the block is stored
450     * @param dataSize the uncompressed data size
451     */
452    @Override
453    protected void add(final byte[] key, final long offset, final int dataSize) {
454      blockOffsets[rootCount] = offset;
455      // Create the blockKeys as Cells once when the reader is opened
456      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
457      blockDataSizes[rootCount] = dataSize;
458      rootCount++;
459    }
460
461    @Override
462    public int rootBlockContainingKey(final byte[] key, int offset, int length,
463        CellComparator comp) {
464      // This should always be called with Cell not with a byte[] key
465      throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
466          "array. Only cell based keys can be searched for");
467    }
468
469    @Override
470    public int rootBlockContainingKey(Cell key) {
471      // Here the comparator should not be null as this happens for the root-level block
472      int pos = Bytes.binarySearch(blockKeys, key, comparator);
473      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
474      // binarySearch's javadoc.
475
476      if (pos >= 0) {
477        // This means this is an exact match with an element of blockKeys.
478        assert pos < blockKeys.length;
479        return pos;
480      }
481
482      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
483      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
484      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
485      // key < blockKeys[0], meaning the file does not contain the given key.
486
487      int i = -pos - 1;
488      assert 0 <= i && i <= blockKeys.length;
489      return i - 1;
490    }
491
492    @Override
493    public String toString() {
494      StringBuilder sb = new StringBuilder();
495      sb.append("size=" + rootCount).append("\n");
496      for (int i = 0; i < rootCount; i++) {
497        sb.append("key=").append((blockKeys[i]))
498            .append("\n  offset=").append(blockOffsets[i])
499            .append(", dataSize=" + blockDataSizes[i]).append("\n");
500      }
501      return sb.toString();
502    }
503  }
504
505  /**
506   * The reader will always hold the root level index in the memory. Index
507   * blocks at all other levels will be cached in the LRU cache in practice,
508   * although this API does not enforce that.
509   *
510   * <p>All non-root (leaf and intermediate) index blocks contain what we call a
511   * "secondary index": an array of offsets to the entries within the block.
512   * This allows us to do binary search for the entry corresponding to the
513   * given key without having to deserialize the block.
514   */
515  static abstract class BlockIndexReader implements HeapSize {
516
517    protected long[] blockOffsets;
518    protected int[] blockDataSizes;
519    protected int rootCount = 0;
520
521    // Mid-key metadata.
522    protected long midLeafBlockOffset = -1;
523    protected int midLeafBlockOnDiskSize = -1;
524    protected int midKeyEntry = -1;
525
526    /**
527     * The number of levels in the block index tree. One if there is only root
528     * level, two for root and leaf levels, etc.
529     */
530    protected int searchTreeLevel;
531
532    /**
533     * @return true if the block index is empty.
534     */
535    public abstract boolean isEmpty();
536
537    /**
538     * Verifies that the block index is non-empty and throws an
539     * {@link IllegalStateException} otherwise.
540     */
541    public void ensureNonEmpty() {
542      if (isEmpty()) {
543        throw new IllegalStateException("Block index is empty or not loaded");
544      }
545    }
546
547    /**
548     * Return the data block which contains this key. This function will only
549     * be called when the HFile version is larger than 1.
550     *
551     * @param key the key we are looking for
552     * @param currentBlock the current block, to avoid re-reading the same block
553     * @param cacheBlocks
554     * @param pread
555     * @param isCompaction
556     * @param expectedDataBlockEncoding the data block encoding the caller is
557     *          expecting the data block to be in, or null to not perform this
558     *          check and return the block irrespective of the encoding
559     * @return reader a basic way to load blocks
560     * @throws IOException
561     */
562    public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
563        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding,
564        CachingBlockReader cachingBlockReader) throws IOException {
565      BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
566        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding, cachingBlockReader);
567      if (blockWithScanInfo == null) {
568        return null;
569      } else {
570        return blockWithScanInfo.getHFileBlock();
571      }
572    }
573
574    /**
575     * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
576     * other scan info such as the key that starts the next HFileBlock. This function will only
577     * be called when the HFile version is larger than 1.
578     *
579     * @param key the key we are looking for
580     * @param currentBlock the current block, to avoid re-reading the same block
581     * @param expectedDataBlockEncoding the data block encoding the caller is
582     *          expecting the data block to be in, or null to not perform this
583     *          check and return the block irrespective of the encoding.
584     * @return the BlockWithScanInfo which contains the DataBlock with other
585     *         scan info such as nextIndexedKey.
586     * @throws IOException
587     */
588    public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
589        boolean cacheBlocks, boolean pread, boolean isCompaction,
590        DataBlockEncoding expectedDataBlockEncoding,
591        CachingBlockReader cachingBlockReader) throws IOException;
592
593    /**
594     * An approximation to the {@link HFile}'s mid-key. Operates on block
595     * boundaries, and does not go inside blocks. In other words, returns the
596     * first key of the middle block of the file.
597     *
598     * @return the first key of the middle block
599     */
600    public abstract Cell midkey(CachingBlockReader cachingBlockReader) throws IOException;
601
602    /**
603     * @param i from 0 to {@link #getRootBlockCount() - 1}
604     */
605    public long getRootBlockOffset(int i) {
606      return blockOffsets[i];
607    }
608
609    /**
610     * @param i zero-based index of a root-level block
611     * @return the on-disk size of the root-level block for version 2, or the
612     *         uncompressed size for version 1
613     */
614    public int getRootBlockDataSize(int i) {
615      return blockDataSizes[i];
616    }
617
618    /**
619     * @return the number of root-level blocks in this block index
620     */
621    public int getRootBlockCount() {
622      return rootCount;
623    }
624
625    /**
626     * Finds the root-level index block containing the given key.
627     *
628     * @param key
629     *          Key to find
630     * @param comp
631     *          the comparator to be used
632     * @return Offset of block containing <code>key</code> (between 0 and the
633     *         number of blocks - 1) or -1 if this file does not contain the
634     *         request.
635     */
636    // When we want to find the meta index block or bloom block for ROW bloom
637    // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
638    // CellComparator.
639    public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
640        CellComparator comp);
641
642    /**
643     * Finds the root-level index block containing the given key.
644     *
645     * @param key
646     *          Key to find
647     * @return Offset of block containing <code>key</code> (between 0 and the
648     *         number of blocks - 1) or -1 if this file does not contain the
649     *         request.
650     */
651    // When we want to find the meta index block or bloom block for ROW bloom
652    // type
653    // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
654    // need the CellComparator.
655    public int rootBlockContainingKey(final byte[] key, int offset, int length) {
656      return rootBlockContainingKey(key, offset, length, null);
657    }
658
659    /**
660     * Finds the root-level index block containing the given key.
661     *
662     * @param key
663     *          Key to find
664     */
665    public abstract int rootBlockContainingKey(final Cell key);
666
667    /**
668     * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
669     * @param nonRootIndex
670     * @param i the ith position
671     * @return The indexed key at the ith position in the nonRootIndex.
672     */
673    protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
674      int numEntries = nonRootIndex.getInt(0);
675      if (i < 0 || i >= numEntries) {
676        return null;
677      }
678
679      // Entries start after the number of entries and the secondary index.
680      // The secondary index takes numEntries + 1 ints.
681      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
682      // Targetkey's offset relative to the end of secondary index
683      int targetKeyRelOffset = nonRootIndex.getInt(
684          Bytes.SIZEOF_INT * (i + 1));
685
686      // The offset of the target key in the blockIndex buffer
687      int targetKeyOffset = entriesOffset     // Skip secondary index
688          + targetKeyRelOffset               // Skip all entries until mid
689          + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
690
691      // We subtract the two consecutive secondary index elements, which
692      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
693      // then need to subtract the overhead of offset and onDiskSize.
694      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
695        targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
696
697      // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
698      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
699    }
700
701    /**
702     * Performs a binary search over a non-root level index block. Utilizes the
703     * secondary index, which records the offsets of (offset, onDiskSize,
704     * firstKey) tuples of all entries.
705     *
706     * @param key
707     *          the key we are searching for offsets to individual entries in
708     *          the blockIndex buffer
709     * @param nonRootIndex
710     *          the non-root index block buffer, starting with the secondary
711     *          index. The position is ignored.
712     * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
713     *         keys[i + 1], if keys is the array of all keys being searched, or
714     *         -1 otherwise
715     * @throws IOException
716     */
717    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
718        CellComparator comparator) {
719
720      int numEntries = nonRootIndex.getIntAfterPosition(0);
721      int low = 0;
722      int high = numEntries - 1;
723      int mid = 0;
724
725      // Entries start after the number of entries and the secondary index.
726      // The secondary index takes numEntries + 1 ints.
727      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
728
729      // If we imagine that keys[-1] = -Infinity and
730      // keys[numEntries] = Infinity, then we are maintaining an invariant that
731      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
732      ByteBufferKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferKeyOnlyKeyValue();
733      ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<>();
734      while (low <= high) {
735        mid = low + ((high - low) >> 1);
736
737        // Midkey's offset relative to the end of secondary index
738        int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
739
740        // The offset of the middle key in the blockIndex buffer
741        int midKeyOffset = entriesOffset       // Skip secondary index
742            + midKeyRelOffset                  // Skip all entries until mid
743            + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
744
745        // We subtract the two consecutive secondary index elements, which
746        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
747        // then need to subtract the overhead of offset and onDiskSize.
748        int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
749            midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
750
751        // we have to compare in this order, because the comparator order
752        // has special logic when the 'left side' is a special key.
753        // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
754        // done after HBASE-12224 & HBASE-12282
755        // TODO avoid array call.
756        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
757        nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
758        int cmp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, nonRootIndexkeyOnlyKV);
759
760        // key lives above the midpoint
761        if (cmp > 0)
762          low = mid + 1; // Maintain the invariant that keys[low - 1] < key
763        // key lives below the midpoint
764        else if (cmp < 0)
765          high = mid - 1; // Maintain the invariant that key < keys[high + 1]
766        else
767          return mid; // exact match
768      }
769
770      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
771      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
772      // condition, low >= high + 1. Therefore, low = high + 1.
773
774      if (low != high + 1) {
775        throw new IllegalStateException("Binary search broken: low=" + low
776            + " " + "instead of " + (high + 1));
777      }
778
779      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
780      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
781      int i = low - 1;
782
783      // Some extra validation on the result.
784      if (i < -1 || i >= numEntries) {
785        throw new IllegalStateException("Binary search broken: result is " +
786            i + " but expected to be between -1 and (numEntries - 1) = " +
787            (numEntries - 1));
788      }
789
790      return i;
791    }
792
793    /**
794     * Search for one key using the secondary index in a non-root block. In case
795     * of success, positions the provided buffer at the entry of interest, where
796     * the file offset and the on-disk-size can be read.
797     *
798     * @param nonRootBlock
799     *          a non-root block without header. Initial position does not
800     *          matter.
801     * @param key
802     *          the byte array containing the key
803     * @return the index position where the given key was found, otherwise
804     *         return -1 in the case the given key is before the first key.
805     *
806     */
807    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
808        CellComparator comparator) {
809      int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
810
811      if (entryIndex != -1) {
812        int numEntries = nonRootBlock.getIntAfterPosition(0);
813
814        // The end of secondary index and the beginning of entries themselves.
815        int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
816
817        // The offset of the entry we are interested in relative to the end of
818        // the secondary index.
819        int entryRelOffset = nonRootBlock
820            .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
821
822        nonRootBlock.position(entriesOffset + entryRelOffset);
823      }
824
825      return entryIndex;
826    }
827
828    /**
829     * Read in the root-level index from the given input stream. Must match
830     * what was written into the root level by
831     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
832     * offset that function returned.
833     *
834     * @param in the buffered input stream or wrapped byte input stream
835     * @param numEntries the number of root-level index entries
836     * @throws IOException
837     */
838    public void readRootIndex(DataInput in, final int numEntries) throws IOException {
839      blockOffsets = new long[numEntries];
840      initialize(numEntries);
841      blockDataSizes = new int[numEntries];
842
843      // If index size is zero, no index was written.
844      if (numEntries > 0) {
845        for (int i = 0; i < numEntries; ++i) {
846          long offset = in.readLong();
847          int dataSize = in.readInt();
848          byte[] key = Bytes.readByteArray(in);
849          add(key, offset, dataSize);
850        }
851      }
852    }
853
854    protected abstract void initialize(int numEntries);
855
856    protected abstract void add(final byte[] key, final long offset, final int dataSize);
857
858    /**
859     * Read in the root-level index from the given input stream. Must match
860     * what was written into the root level by
861     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
862     * offset that function returned.
863     *
864     * @param blk the HFile block
865     * @param numEntries the number of root-level index entries
866     * @return the buffered input stream or wrapped byte input stream
867     * @throws IOException
868     */
869    public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
870      DataInputStream in = blk.getByteStream();
871      readRootIndex(in, numEntries);
872      return in;
873    }
874
875    /**
876     * Read the root-level metadata of a multi-level block index. Based on
877     * {@link #readRootIndex(DataInput, int)}, but also reads metadata
878     * necessary to compute the mid-key in a multi-level index.
879     *
880     * @param blk the HFile block
881     * @param numEntries the number of root-level index entries
882     * @throws IOException
883     */
884    public void readMultiLevelIndexRoot(HFileBlock blk,
885        final int numEntries) throws IOException {
886      DataInputStream in = readRootIndex(blk, numEntries);
887      // after reading the root index the checksum bytes have to
888      // be subtracted to know if the mid key exists.
889      int checkSumBytes = blk.totalChecksumBytes();
890      if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
891        // No mid-key metadata available.
892        return;
893      }
894      midLeafBlockOffset = in.readLong();
895      midLeafBlockOnDiskSize = in.readInt();
896      midKeyEntry = in.readInt();
897    }
898
899    @Override
900    public long heapSize() {
901      // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
902      long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
903          2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
904
905      // Mid-key metadata.
906      heapSize += MID_KEY_METADATA_SIZE;
907
908      heapSize = calculateHeapSizeForBlockKeys(heapSize);
909
910      if (blockOffsets != null) {
911        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
912            * Bytes.SIZEOF_LONG);
913      }
914
915      if (blockDataSizes != null) {
916        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
917            * Bytes.SIZEOF_INT);
918      }
919
920      return ClassSize.align(heapSize);
921    }
922
923    protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
924  }
925
926  /**
927   * Writes the block index into the output stream. Generate the tree from
928   * bottom up. The leaf level is written to disk as a sequence of inline
929   * blocks, if it is larger than a certain number of bytes. If the leaf level
930   * is not large enough, we write all entries to the root level instead.
931   *
932   * After all leaf blocks have been written, we end up with an index
933   * referencing the resulting leaf index blocks. If that index is larger than
934   * the allowed root index size, the writer will break it up into
935   * reasonable-size intermediate-level index block chunks write those chunks
936   * out, and create another index referencing those chunks. This will be
937   * repeated until the remaining index is small enough to become the root
938   * index. However, in most practical cases we will only have leaf-level
939   * blocks and the root index, or just the root index.
940   */
941  public static class BlockIndexWriter implements InlineBlockWriter {
942    /**
943     * While the index is being written, this represents the current block
944     * index referencing all leaf blocks, with one exception. If the file is
945     * being closed and there are not enough blocks to complete even a single
946     * leaf block, no leaf blocks get written and this contains the entire
947     * block index. After all levels of the index were written by
948     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
949     * root-level index.
950     */
951    private BlockIndexChunk rootChunk = new BlockIndexChunk();
952
953    /**
954     * Current leaf-level chunk. New entries referencing data blocks get added
955     * to this chunk until it grows large enough to be written to disk.
956     */
957    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
958
959    /**
960     * The number of block index levels. This is one if there is only root
961     * level (even empty), two if there a leaf level and root level, and is
962     * higher if there are intermediate levels. This is only final after
963     * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
964     * initial value accounts for the root level, and will be increased to two
965     * as soon as we find out there is a leaf-level in
966     * {@link #blockWritten(long, int, int)}.
967     */
968    private int numLevels = 1;
969
970    private HFileBlock.Writer blockWriter;
971    private byte[] firstKey = null;
972
973    /**
974     * The total number of leaf-level entries, i.e. entries referenced by
975     * leaf-level blocks. For the data block index this is equal to the number
976     * of data blocks.
977     */
978    private long totalNumEntries;
979
980    /** Total compressed size of all index blocks. */
981    private long totalBlockOnDiskSize;
982
983    /** Total uncompressed size of all index blocks. */
984    private long totalBlockUncompressedSize;
985
986    /** The maximum size guideline of all multi-level index blocks. */
987    private int maxChunkSize;
988
989    /** The maximum level of multi-level index blocks */
990    private int minIndexNumEntries;
991
992    /** Whether we require this block index to always be single-level. */
993    private boolean singleLevelOnly;
994
995    /** CacheConfig, or null if cache-on-write is disabled */
996    private CacheConfig cacheConf;
997
998    /** Name to use for computing cache keys */
999    private String nameForCaching;
1000
1001    /** Creates a single-level block index writer */
1002    public BlockIndexWriter() {
1003      this(null, null, null);
1004      singleLevelOnly = true;
1005    }
1006
1007    /**
1008     * Creates a multi-level block index writer.
1009     *
1010     * @param blockWriter the block writer to use to write index blocks
1011     * @param cacheConf used to determine when and how a block should be cached-on-write.
1012     */
1013    public BlockIndexWriter(HFileBlock.Writer blockWriter,
1014        CacheConfig cacheConf, String nameForCaching) {
1015      if ((cacheConf == null) != (nameForCaching == null)) {
1016        throw new IllegalArgumentException("Block cache and file name for " +
1017            "caching must be both specified or both null");
1018      }
1019
1020      this.blockWriter = blockWriter;
1021      this.cacheConf = cacheConf;
1022      this.nameForCaching = nameForCaching;
1023      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1024      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1025    }
1026
1027    public void setMaxChunkSize(int maxChunkSize) {
1028      if (maxChunkSize <= 0) {
1029        throw new IllegalArgumentException("Invalid maximum index block size");
1030      }
1031      this.maxChunkSize = maxChunkSize;
1032    }
1033
1034    public void setMinIndexNumEntries(int minIndexNumEntries) {
1035      if (minIndexNumEntries <= 1) {
1036        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1037      }
1038      this.minIndexNumEntries = minIndexNumEntries;
1039    }
1040
1041    /**
1042     * Writes the root level and intermediate levels of the block index into
1043     * the output stream, generating the tree from bottom up. Assumes that the
1044     * leaf level has been inline-written to the disk if there is enough data
1045     * for more than one leaf block. We iterate by breaking the current level
1046     * of the block index, starting with the index of all leaf-level blocks,
1047     * into chunks small enough to be written to disk, and generate its parent
1048     * level, until we end up with a level small enough to become the root
1049     * level.
1050     *
1051     * If the leaf level is not large enough, there is no inline block index
1052     * anymore, so we only write that level of block index to disk as the root
1053     * level.
1054     *
1055     * @param out FSDataOutputStream
1056     * @return position at which we entered the root-level index.
1057     * @throws IOException
1058     */
1059    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1060      if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1061        throw new IOException("Trying to write a multi-level block index, " +
1062            "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1063            "last inline chunk.");
1064      }
1065
1066      // We need to get mid-key metadata before we create intermediate
1067      // indexes and overwrite the root chunk.
1068      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1069          : null;
1070
1071      if (curInlineChunk != null) {
1072        while (rootChunk.getRootSize() > maxChunkSize
1073            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1074            && rootChunk.getNumEntries() > minIndexNumEntries
1075            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1076            && numLevels < 16) {
1077          rootChunk = writeIntermediateLevel(out, rootChunk);
1078          numLevels += 1;
1079        }
1080      }
1081
1082      // write the root level
1083      long rootLevelIndexPos = out.getPos();
1084
1085      {
1086        DataOutput blockStream =
1087            blockWriter.startWriting(BlockType.ROOT_INDEX);
1088        rootChunk.writeRoot(blockStream);
1089        if (midKeyMetadata != null)
1090          blockStream.write(midKeyMetadata);
1091        blockWriter.writeHeaderAndData(out);
1092        if (cacheConf != null) {
1093          cacheConf.getBlockCache().ifPresent(cache -> {
1094            HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1095            cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
1096                blockForCaching.getBlockType()), blockForCaching);
1097          });
1098        }
1099      }
1100
1101      // Add root index block size
1102      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1103      totalBlockUncompressedSize +=
1104          blockWriter.getUncompressedSizeWithoutHeader();
1105
1106      if (LOG.isTraceEnabled()) {
1107        LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1108          + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1109          + " root-level entries, " + totalNumEntries + " total entries, "
1110          + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1111          " on-disk size, "
1112          + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1113          " total uncompressed size.");
1114      }
1115      return rootLevelIndexPos;
1116    }
1117
1118    /**
1119     * Writes the block index data as a single level only. Does not do any
1120     * block framing.
1121     *
1122     * @param out the buffered output stream to write the index to. Typically a
1123     *          stream writing into an {@link HFile} block.
1124     * @param description a short description of the index being written. Used
1125     *          in a log message.
1126     * @throws IOException
1127     */
1128    public void writeSingleLevelIndex(DataOutput out, String description)
1129        throws IOException {
1130      expectNumLevels(1);
1131
1132      if (!singleLevelOnly)
1133        throw new IOException("Single-level mode is turned off");
1134
1135      if (rootChunk.getNumEntries() > 0)
1136        throw new IOException("Root-level entries already added in " +
1137            "single-level mode");
1138
1139      rootChunk = curInlineChunk;
1140      curInlineChunk = new BlockIndexChunk();
1141
1142      if (LOG.isTraceEnabled()) {
1143        LOG.trace("Wrote a single-level " + description + " index with "
1144          + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1145          + " bytes");
1146      }
1147      rootChunk.writeRoot(out);
1148    }
1149
1150    /**
1151     * Split the current level of the block index into intermediate index
1152     * blocks of permitted size and write those blocks to disk. Return the next
1153     * level of the block index referencing those intermediate-level blocks.
1154     *
1155     * @param out
1156     * @param currentLevel the current level of the block index, such as the a
1157     *          chunk referencing all leaf-level index blocks
1158     * @return the parent level block index, which becomes the root index after
1159     *         a few (usually zero) iterations
1160     * @throws IOException
1161     */
1162    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1163        BlockIndexChunk currentLevel) throws IOException {
1164      // Entries referencing intermediate-level blocks we are about to create.
1165      BlockIndexChunk parent = new BlockIndexChunk();
1166
1167      // The current intermediate-level block index chunk.
1168      BlockIndexChunk curChunk = new BlockIndexChunk();
1169
1170      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1171        curChunk.add(currentLevel.getBlockKey(i),
1172            currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1173
1174        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1175        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1176        // first key is larger than maxChunkSize this will cause infinite recursion.
1177        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1178          writeIntermediateBlock(out, parent, curChunk);
1179        }
1180      }
1181
1182      if (curChunk.getNumEntries() > 0) {
1183        writeIntermediateBlock(out, parent, curChunk);
1184      }
1185
1186      return parent;
1187    }
1188
1189    private void writeIntermediateBlock(FSDataOutputStream out,
1190        BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1191      long beginOffset = out.getPos();
1192      DataOutputStream dos = blockWriter.startWriting(
1193          BlockType.INTERMEDIATE_INDEX);
1194      curChunk.writeNonRoot(dos);
1195      byte[] curFirstKey = curChunk.getBlockKey(0);
1196      blockWriter.writeHeaderAndData(out);
1197
1198      if (getCacheOnWrite()) {
1199        cacheConf.getBlockCache().ifPresent(cache -> {
1200          HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1201          cache.cacheBlock(
1202              new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
1203              blockForCaching);
1204        });
1205      }
1206
1207      // Add intermediate index block size
1208      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1209      totalBlockUncompressedSize +=
1210          blockWriter.getUncompressedSizeWithoutHeader();
1211
1212      // OFFSET is the beginning offset the chunk of block index entries.
1213      // SIZE is the total byte size of the chunk of block index entries
1214      // + the secondary index size
1215      // FIRST_KEY is the first key in the chunk of block index
1216      // entries.
1217      parent.add(curFirstKey, beginOffset,
1218          blockWriter.getOnDiskSizeWithHeader());
1219
1220      // clear current block index chunk
1221      curChunk.clear();
1222      curFirstKey = null;
1223    }
1224
1225    /**
1226     * @return how many block index entries there are in the root level
1227     */
1228    public final int getNumRootEntries() {
1229      return rootChunk.getNumEntries();
1230    }
1231
1232    /**
1233     * @return the number of levels in this block index.
1234     */
1235    public int getNumLevels() {
1236      return numLevels;
1237    }
1238
1239    private void expectNumLevels(int expectedNumLevels) {
1240      if (numLevels != expectedNumLevels) {
1241        throw new IllegalStateException("Number of block index levels is "
1242            + numLevels + "but is expected to be " + expectedNumLevels);
1243      }
1244    }
1245
1246    /**
1247     * Whether there is an inline block ready to be written. In general, we
1248     * write an leaf-level index block as an inline block as soon as its size
1249     * as serialized in the non-root format reaches a certain threshold.
1250     */
1251    @Override
1252    public boolean shouldWriteBlock(boolean closing) {
1253      if (singleLevelOnly) {
1254        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1255      }
1256
1257      if (curInlineChunk == null) {
1258        throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1259            "called with closing=true and then called again?");
1260      }
1261
1262      if (curInlineChunk.getNumEntries() == 0) {
1263        return false;
1264      }
1265
1266      // We do have some entries in the current inline chunk.
1267      if (closing) {
1268        if (rootChunk.getNumEntries() == 0) {
1269          // We did not add any leaf-level blocks yet. Instead of creating a
1270          // leaf level with one block, move these entries to the root level.
1271
1272          expectNumLevels(1);
1273          rootChunk = curInlineChunk;
1274          curInlineChunk = null;  // Disallow adding any more index entries.
1275          return false;
1276        }
1277
1278        return true;
1279      } else {
1280        return curInlineChunk.getNonRootSize() >= maxChunkSize;
1281      }
1282    }
1283
1284    /**
1285     * Write out the current inline index block. Inline blocks are non-root
1286     * blocks, so the non-root index format is used.
1287     *
1288     * @param out
1289     */
1290    @Override
1291    public void writeInlineBlock(DataOutput out) throws IOException {
1292      if (singleLevelOnly)
1293        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1294
1295      // Write the inline block index to the output stream in the non-root
1296      // index block format.
1297      curInlineChunk.writeNonRoot(out);
1298
1299      // Save the first key of the inline block so that we can add it to the
1300      // parent-level index.
1301      firstKey = curInlineChunk.getBlockKey(0);
1302
1303      // Start a new inline index block
1304      curInlineChunk.clear();
1305    }
1306
1307    /**
1308     * Called after an inline block has been written so that we can add an
1309     * entry referring to that block to the parent-level index.
1310     */
1311    @Override
1312    public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1313      // Add leaf index block size
1314      totalBlockOnDiskSize += onDiskSize;
1315      totalBlockUncompressedSize += uncompressedSize;
1316
1317      if (singleLevelOnly)
1318        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1319
1320      if (firstKey == null) {
1321        throw new IllegalStateException("Trying to add second-level index " +
1322            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1323            "but the first key was not set in writeInlineBlock");
1324      }
1325
1326      if (rootChunk.getNumEntries() == 0) {
1327        // We are writing the first leaf block, so increase index level.
1328        expectNumLevels(1);
1329        numLevels = 2;
1330      }
1331
1332      // Add another entry to the second-level index. Include the number of
1333      // entries in all previous leaf-level chunks for mid-key calculation.
1334      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1335      firstKey = null;
1336    }
1337
1338    @Override
1339    public BlockType getInlineBlockType() {
1340      return BlockType.LEAF_INDEX;
1341    }
1342
1343    /**
1344     * Add one index entry to the current leaf-level block. When the leaf-level
1345     * block gets large enough, it will be flushed to disk as an inline block.
1346     *
1347     * @param firstKey the first key of the data block
1348     * @param blockOffset the offset of the data block
1349     * @param blockDataSize the on-disk size of the data block ({@link HFile}
1350     *          format version 2), or the uncompressed size of the data block (
1351     *          {@link HFile} format version 1).
1352     */
1353    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1354      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1355      ++totalNumEntries;
1356    }
1357
1358    /**
1359     * @throws IOException if we happened to write a multi-level index.
1360     */
1361    public void ensureSingleLevel() throws IOException {
1362      if (numLevels > 1) {
1363        throw new IOException ("Wrote a " + numLevels + "-level index with " +
1364            rootChunk.getNumEntries() + " root-level entries, but " +
1365            "this is expected to be a single-level block index.");
1366      }
1367    }
1368
1369    /**
1370     * @return true if we are using cache-on-write. This is configured by the
1371     *         caller of the constructor by either passing a valid block cache
1372     *         or null.
1373     */
1374    @Override
1375    public boolean getCacheOnWrite() {
1376      return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1377    }
1378
1379    /**
1380     * The total uncompressed size of the root index block, intermediate-level
1381     * index blocks, and leaf-level index blocks.
1382     *
1383     * @return the total uncompressed size of all index blocks
1384     */
1385    public long getTotalUncompressedSize() {
1386      return totalBlockUncompressedSize;
1387    }
1388
1389  }
1390
1391  /**
1392   * A single chunk of the block index in the process of writing. The data in
1393   * this chunk can become a leaf-level, intermediate-level, or root index
1394   * block.
1395   */
1396  static class BlockIndexChunk {
1397
1398    /** First keys of the key range corresponding to each index entry. */
1399    private final List<byte[]> blockKeys = new ArrayList<>();
1400
1401    /** Block offset in backing stream. */
1402    private final List<Long> blockOffsets = new ArrayList<>();
1403
1404    /** On-disk data sizes of lower-level data or index blocks. */
1405    private final List<Integer> onDiskDataSizes = new ArrayList<>();
1406
1407    /**
1408     * The cumulative number of sub-entries, i.e. entries on deeper-level block
1409     * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1410     * blocks corresponding to this chunk's entries #0 through #i inclusively.
1411     */
1412    private final List<Long> numSubEntriesAt = new ArrayList<>();
1413
1414    /**
1415     * The offset of the next entry to be added, relative to the end of the
1416     * "secondary index" in the "non-root" format representation of this index
1417     * chunk. This is the next value to be added to the secondary index.
1418     */
1419    private int curTotalNonRootEntrySize = 0;
1420
1421    /**
1422     * The accumulated size of this chunk if stored in the root index format.
1423     */
1424    private int curTotalRootSize = 0;
1425
1426    /**
1427     * The "secondary index" used for binary search over variable-length
1428     * records in a "non-root" format block. These offsets are relative to the
1429     * end of this secondary index.
1430     */
1431    private final List<Integer> secondaryIndexOffsetMarks = new ArrayList<>();
1432
1433    /**
1434     * Adds a new entry to this block index chunk.
1435     *
1436     * @param firstKey the first key in the block pointed to by this entry
1437     * @param blockOffset the offset of the next-level block pointed to by this
1438     *          entry
1439     * @param onDiskDataSize the on-disk data of the block pointed to by this
1440     *          entry, including header size
1441     * @param curTotalNumSubEntries if this chunk is the root index chunk under
1442     *          construction, this specifies the current total number of
1443     *          sub-entries in all leaf-level chunks, including the one
1444     *          corresponding to the second-level entry being added.
1445     */
1446    void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1447        long curTotalNumSubEntries) {
1448      // Record the offset for the secondary index
1449      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1450      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1451          + firstKey.length;
1452
1453      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1454          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1455
1456      blockKeys.add(firstKey);
1457      blockOffsets.add(blockOffset);
1458      onDiskDataSizes.add(onDiskDataSize);
1459
1460      if (curTotalNumSubEntries != -1) {
1461        numSubEntriesAt.add(curTotalNumSubEntries);
1462
1463        // Make sure the parallel arrays are in sync.
1464        if (numSubEntriesAt.size() != blockKeys.size()) {
1465          throw new IllegalStateException("Only have key/value count " +
1466              "stats for " + numSubEntriesAt.size() + " block index " +
1467              "entries out of " + blockKeys.size());
1468        }
1469      }
1470    }
1471
1472    /**
1473     * The same as {@link #add(byte[], long, int, long)} but does not take the
1474     * key/value into account. Used for single-level indexes.
1475     *
1476     * @see #add(byte[], long, int, long)
1477     */
1478    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1479      add(firstKey, blockOffset, onDiskDataSize, -1);
1480    }
1481
1482    public void clear() {
1483      blockKeys.clear();
1484      blockOffsets.clear();
1485      onDiskDataSizes.clear();
1486      secondaryIndexOffsetMarks.clear();
1487      numSubEntriesAt.clear();
1488      curTotalNonRootEntrySize = 0;
1489      curTotalRootSize = 0;
1490    }
1491
1492    /**
1493     * Finds the entry corresponding to the deeper-level index block containing
1494     * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1495     * ordering of sub-entries.
1496     *
1497     * <p>
1498     * <i> Implementation note. </i> We are looking for i such that
1499     * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1500     * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1501     * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1502     * sub-entries. i is by definition the insertion point of k in
1503     * numSubEntriesAt.
1504     *
1505     * @param k sub-entry index, from 0 to the total number sub-entries - 1
1506     * @return the 0-based index of the entry corresponding to the given
1507     *         sub-entry
1508     */
1509    public int getEntryBySubEntry(long k) {
1510      // We define mid-key as the key corresponding to k'th sub-entry
1511      // (0-based).
1512
1513      int i = Collections.binarySearch(numSubEntriesAt, k);
1514
1515      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1516      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1517      // is in the (i + 1)'th chunk.
1518      if (i >= 0)
1519        return i + 1;
1520
1521      // Inexact match. Return the insertion point.
1522      return -i - 1;
1523    }
1524
1525    /**
1526     * Used when writing the root block index of a multi-level block index.
1527     * Serializes additional information allowing to efficiently identify the
1528     * mid-key.
1529     *
1530     * @return a few serialized fields for finding the mid-key
1531     * @throws IOException if could not create metadata for computing mid-key
1532     */
1533    public byte[] getMidKeyMetadata() throws IOException {
1534      ByteArrayOutputStream baos = new ByteArrayOutputStream(
1535          MID_KEY_METADATA_SIZE);
1536      DataOutputStream baosDos = new DataOutputStream(baos);
1537      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1538      if (totalNumSubEntries == 0) {
1539        throw new IOException("No leaf-level entries, mid-key unavailable");
1540      }
1541      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1542      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1543
1544      baosDos.writeLong(blockOffsets.get(midKeyEntry));
1545      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1546
1547      long numSubEntriesBefore = midKeyEntry > 0
1548          ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1549      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1550      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1551      {
1552        throw new IOException("Could not identify mid-key index within the "
1553            + "leaf-level block containing mid-key: out of range ("
1554            + subEntryWithinEntry + ", numSubEntriesBefore="
1555            + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1556            + ")");
1557      }
1558
1559      baosDos.writeInt((int) subEntryWithinEntry);
1560
1561      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1562        throw new IOException("Could not write mid-key metadata: size=" +
1563            baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1564      }
1565
1566      // Close just to be good citizens, although this has no effect.
1567      baos.close();
1568
1569      return baos.toByteArray();
1570    }
1571
1572    /**
1573     * Writes the block index chunk in the non-root index block format. This
1574     * format contains the number of entries, an index of integer offsets
1575     * for quick binary search on variable-length records, and tuples of
1576     * block offset, on-disk block size, and the first key for each entry.
1577     *
1578     * @param out
1579     * @throws IOException
1580     */
1581    void writeNonRoot(DataOutput out) throws IOException {
1582      // The number of entries in the block.
1583      out.writeInt(blockKeys.size());
1584
1585      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1586        throw new IOException("Corrupted block index chunk writer: " +
1587            blockKeys.size() + " entries but " +
1588            secondaryIndexOffsetMarks.size() + " secondary index items");
1589      }
1590
1591      // For each entry, write a "secondary index" of relative offsets to the
1592      // entries from the end of the secondary index. This works, because at
1593      // read time we read the number of entries and know where the secondary
1594      // index ends.
1595      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1596        out.writeInt(currentSecondaryIndex);
1597
1598      // We include one other element in the secondary index to calculate the
1599      // size of each entry more easily by subtracting secondary index elements.
1600      out.writeInt(curTotalNonRootEntrySize);
1601
1602      for (int i = 0; i < blockKeys.size(); ++i) {
1603        out.writeLong(blockOffsets.get(i));
1604        out.writeInt(onDiskDataSizes.get(i));
1605        out.write(blockKeys.get(i));
1606      }
1607    }
1608
1609    /**
1610     * @return the size of this chunk if stored in the non-root index block
1611     *         format
1612     */
1613    int getNonRootSize() {
1614      return Bytes.SIZEOF_INT                          // Number of entries
1615          + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1616          + curTotalNonRootEntrySize;                  // All entries
1617    }
1618
1619    /**
1620     * Writes this chunk into the given output stream in the root block index
1621     * format. This format is similar to the {@link HFile} version 1 block
1622     * index format, except that we store on-disk size of the block instead of
1623     * its uncompressed size.
1624     *
1625     * @param out the data output stream to write the block index to. Typically
1626     *          a stream writing into an {@link HFile} block.
1627     * @throws IOException
1628     */
1629    void writeRoot(DataOutput out) throws IOException {
1630      for (int i = 0; i < blockKeys.size(); ++i) {
1631        out.writeLong(blockOffsets.get(i));
1632        out.writeInt(onDiskDataSizes.get(i));
1633        Bytes.writeByteArray(out, blockKeys.get(i));
1634      }
1635    }
1636
1637    /**
1638     * @return the size of this chunk if stored in the root index block format
1639     */
1640    int getRootSize() {
1641      return curTotalRootSize;
1642    }
1643
1644    /**
1645     * @return the number of entries in this block index chunk
1646     */
1647    public int getNumEntries() {
1648      return blockKeys.size();
1649    }
1650
1651    public byte[] getBlockKey(int i) {
1652      return blockKeys.get(i);
1653    }
1654
1655    public long getBlockOffset(int i) {
1656      return blockOffsets.get(i);
1657    }
1658
1659    public int getOnDiskDataSize(int i) {
1660      return onDiskDataSizes.get(i);
1661    }
1662
1663    public long getCumulativeNumKV(int i) {
1664      if (i < 0)
1665        return 0;
1666      return numSubEntriesAt.get(i);
1667    }
1668
1669  }
1670
1671  public static int getMaxChunkSize(Configuration conf) {
1672    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1673  }
1674
1675  public static int getMinIndexNumEntries(Configuration conf) {
1676    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1677  }
1678}