001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.DataOutput;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Collections;
030import java.util.List;
031import java.util.concurrent.atomic.AtomicReference;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038//import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.io.HeapSize;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
049import org.apache.hadoop.hbase.nio.ByteBuff;
050import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.ClassSize;
053import org.apache.hadoop.hbase.util.ObjectIntPair;
054import org.apache.hadoop.io.WritableUtils;
055import org.apache.hadoop.util.StringUtils;
056
057/**
058 * Provides functionality to write ({@link BlockIndexWriter}) and read
059 * BlockIndexReader
060 * single-level and multi-level block indexes.
061 *
062 * Examples of how to use the block index writer can be found in
063 * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
064 *  {@link HFileWriterImpl}. Examples of how to use the reader can be
065 *  found in {@link HFileReaderImpl} and
066 *  org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex.
067 */
068@InterfaceAudience.Private
069public class HFileBlockIndex {
070
071  private static final Logger LOG = LoggerFactory.getLogger(HFileBlockIndex.class);
072
073  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
074
075  /**
076   * The maximum size guideline for index blocks (both leaf, intermediate, and
077   * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
078   */
079  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
080
081  /**
082   * Minimum number of entries in a single index block. Even if we are above the
083   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
084   * entries. We should have at least a few entries so that we don't have too many levels in the
085   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
086   */
087  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
088
089  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
090
091  /**
092   * The number of bytes stored in each "secondary index" entry in addition to
093   * key bytes in the non-root index block format. The first long is the file
094   * offset of the deeper-level block the entry points to, and the int that
095   * follows is that block's on-disk size without including header.
096   */
097  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
098      + Bytes.SIZEOF_LONG;
099
100  /**
101   * Error message when trying to use inline block API in single-level mode.
102   */
103  private static final String INLINE_BLOCKS_NOT_ALLOWED =
104      "Inline blocks are not allowed in the single-level-only mode";
105
106  /**
107   * The size of a meta-data record used for finding the mid-key in a
108   * multi-level index. Consists of the middle leaf-level index block offset
109   * (long), its on-disk size without header included (int), and the mid-key
110   * entry's zero-based index in that leaf index block.
111   */
112  private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
113      2 * Bytes.SIZEOF_INT;
114
115  /**
116   * An implementation of the BlockIndexReader that deals with block keys which are plain
117   * byte[] like MetaBlock or the Bloom Block for ROW bloom.
118   * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
119   */
120   static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
121
122    private byte[][] blockKeys;
123
124    public ByteArrayKeyBlockIndexReader(final int treeLevel,
125        final CachingBlockReader cachingBlockReader) {
126      this(treeLevel);
127      this.cachingBlockReader = cachingBlockReader;
128    }
129
130    public ByteArrayKeyBlockIndexReader(final int treeLevel) {
131      // Can be null for METAINDEX block
132      searchTreeLevel = treeLevel;
133    }
134
135    @Override
136    protected long calculateHeapSizeForBlockKeys(long heapSize) {
137      // Calculating the size of blockKeys
138      if (blockKeys != null) {
139        heapSize += ClassSize.REFERENCE;
140        // Adding array + references overhead
141        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
142
143        // Adding bytes
144        for (byte[] key : blockKeys) {
145          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
146        }
147      }
148      return heapSize;
149    }
150
151    @Override
152    public boolean isEmpty() {
153      return blockKeys.length == 0;
154    }
155
156    /**
157     * @param i
158     *          from 0 to {@link #getRootBlockCount() - 1}
159     */
160    public byte[] getRootBlockKey(int i) {
161      return blockKeys[i];
162    }
163
164    @Override
165    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
166        boolean cacheBlocks, boolean pread, boolean isCompaction,
167        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
168      // this would not be needed
169      return null;
170    }
171
172    @Override
173    public Cell midkey() throws IOException {
174      // Not needed here
175      return null;
176    }
177
178    @Override
179    protected void initialize(int numEntries) {
180      blockKeys = new byte[numEntries][];
181    }
182
183    @Override
184    protected void add(final byte[] key, final long offset, final int dataSize) {
185      blockOffsets[rootCount] = offset;
186      blockKeys[rootCount] = key;
187      blockDataSizes[rootCount] = dataSize;
188      rootCount++;
189    }
190
191    @Override
192    public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
193      int pos = Bytes.binarySearch(blockKeys, key, offset, length);
194      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
195      // binarySearch's javadoc.
196
197      if (pos >= 0) {
198        // This means this is an exact match with an element of blockKeys.
199        assert pos < blockKeys.length;
200        return pos;
201      }
202
203      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
204      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
205      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
206      // key < blockKeys[0], meaning the file does not contain the given key.
207
208      int i = -pos - 1;
209      assert 0 <= i && i <= blockKeys.length;
210      return i - 1;
211    }
212
213    @Override
214    public int rootBlockContainingKey(Cell key) {
215      // Should not be called on this because here it deals only with byte[]
216      throw new UnsupportedOperationException(
217          "Cannot search for a key that is of Cell type. Only plain byte array keys " +
218          "can be searched for");
219    }
220
221    @Override
222    public String toString() {
223      StringBuilder sb = new StringBuilder();
224      sb.append("size=" + rootCount).append("\n");
225      for (int i = 0; i < rootCount; i++) {
226        sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
227            .append("\n  offset=").append(blockOffsets[i])
228            .append(", dataSize=" + blockDataSizes[i]).append("\n");
229      }
230      return sb.toString();
231    }
232
233  }
234
235  /**
236   * An implementation of the BlockIndexReader that deals with block keys which are the key
237   * part of a cell like the Data block index or the ROW_COL bloom blocks
238   * This needs a comparator to work with the Cells
239   */
240   static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
241
242    private Cell[] blockKeys;
243    /** Pre-computed mid-key */
244    private AtomicReference<Cell> midKey = new AtomicReference<>();
245    /** Needed doing lookup on blocks. */
246    private CellComparator comparator;
247
248    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
249        final CachingBlockReader cachingBlockReader) {
250      this(c, treeLevel);
251      this.cachingBlockReader = cachingBlockReader;
252    }
253
254    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
255      // Can be null for METAINDEX block
256      comparator = c;
257      searchTreeLevel = treeLevel;
258    }
259
260    @Override
261    protected long calculateHeapSizeForBlockKeys(long heapSize) {
262      if (blockKeys != null) {
263        heapSize += ClassSize.REFERENCE;
264        // Adding array + references overhead
265        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
266
267        // Adding blockKeys
268        for (Cell key : blockKeys) {
269          heapSize += ClassSize.align(PrivateCellUtil.estimatedSizeOfCell(key));
270        }
271      }
272      // Add comparator and the midkey atomicreference
273      heapSize += 2 * ClassSize.REFERENCE;
274      return heapSize;
275    }
276
277    @Override
278    public boolean isEmpty() {
279      return blockKeys.length == 0;
280    }
281
282    /**
283     * @param i
284     *          from 0 to {@link #getRootBlockCount() - 1}
285     */
286    public Cell getRootBlockKey(int i) {
287      return blockKeys[i];
288    }
289
290    @Override
291    public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
292        boolean cacheBlocks, boolean pread, boolean isCompaction,
293        DataBlockEncoding expectedDataBlockEncoding) throws IOException {
294      int rootLevelIndex = rootBlockContainingKey(key);
295      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
296        return null;
297      }
298
299      // the next indexed key
300      Cell nextIndexedKey = null;
301
302      // Read the next-level (intermediate or leaf) index block.
303      long currentOffset = blockOffsets[rootLevelIndex];
304      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
305
306      if (rootLevelIndex < blockKeys.length - 1) {
307        nextIndexedKey = blockKeys[rootLevelIndex + 1];
308      } else {
309        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
310      }
311
312      int lookupLevel = 1; // How many levels deep we are in our lookup.
313      int index = -1;
314
315      HFileBlock block = null;
316      boolean dataBlock = false;
317      KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
318      while (true) {
319        try {
320          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
321            // Avoid reading the same block again, even with caching turned off.
322            // This is crucial for compaction-type workload which might have
323            // caching turned off. This is like a one-block cache inside the
324            // scanner.
325            block = currentBlock;
326          } else {
327            // Call HFile's caching block reader API. We always cache index
328            // blocks, otherwise we might get terrible performance.
329            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
330            BlockType expectedBlockType;
331            if (lookupLevel < searchTreeLevel - 1) {
332              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
333            } else if (lookupLevel == searchTreeLevel - 1) {
334              expectedBlockType = BlockType.LEAF_INDEX;
335            } else {
336              // this also accounts for ENCODED_DATA
337              expectedBlockType = BlockType.DATA;
338            }
339            block =
340                cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
341                  isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
342          }
343
344          if (block == null) {
345            throw new IOException("Failed to read block at offset " + currentOffset
346                + ", onDiskSize=" + currentOnDiskSize);
347          }
348
349          // Found a data block, break the loop and check our level in the tree.
350          if (block.getBlockType().isData()) {
351            dataBlock = true;
352            break;
353          }
354
355          // Not a data block. This must be a leaf-level or intermediate-level
356          // index block. We don't allow going deeper than searchTreeLevel.
357          if (++lookupLevel > searchTreeLevel) {
358            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
359                + ", searchTreeLevel=" + searchTreeLevel);
360          }
361
362          // Locate the entry corresponding to the given key in the non-root
363          // (leaf or intermediate-level) index block.
364          ByteBuff buffer = block.getBufferWithoutHeader();
365          index = locateNonRootIndexEntry(buffer, key, comparator);
366          if (index == -1) {
367            // This has to be changed
368            // For now change this to key value
369            throw new IOException("The key "
370                + CellUtil.getCellKeyAsString(key)
371                + " is before the" + " first key of the non-root index block " + block);
372          }
373
374          currentOffset = buffer.getLong();
375          currentOnDiskSize = buffer.getInt();
376
377          // Only update next indexed key if there is a next indexed key in the current level
378          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
379          if (nonRootIndexedKey != null) {
380            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
381            nextIndexedKey = tmpNextIndexKV;
382          }
383        } finally {
384          if (!dataBlock) {
385            // Return the block immediately if it is not the
386            // data block
387            cachingBlockReader.returnBlock(block);
388          }
389        }
390      }
391
392      if (lookupLevel != searchTreeLevel) {
393        assert dataBlock == true;
394        // Though we have retrieved a data block we have found an issue
395        // in the retrieved data block. Hence returned the block so that
396        // the ref count can be decremented
397        cachingBlockReader.returnBlock(block);
398        throw new IOException("Reached a data block at level " + lookupLevel +
399            " but the number of levels is " + searchTreeLevel);
400      }
401
402      // set the next indexed key for the current block.
403      BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
404      return blockWithScanInfo;
405    }
406
407    @Override
408    public Cell midkey() throws IOException {
409      if (rootCount == 0)
410        throw new IOException("HFile empty");
411
412      Cell targetMidKey = this.midKey.get();
413      if (targetMidKey != null) {
414        return targetMidKey;
415      }
416
417      if (midLeafBlockOffset >= 0) {
418        if (cachingBlockReader == null) {
419          throw new IOException("Have to read the middle leaf block but " +
420              "no block reader available");
421        }
422
423        // Caching, using pread, assuming this is not a compaction.
424        HFileBlock midLeafBlock = cachingBlockReader.readBlock(
425            midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
426            BlockType.LEAF_INDEX, null);
427        try {
428          ByteBuff b = midLeafBlock.getBufferWithoutHeader();
429          int numDataBlocks = b.getIntAfterPosition(0);
430          int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
431          int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset
432              - SECONDARY_INDEX_ENTRY_OVERHEAD;
433          int keyOffset =
434              Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
435                  + SECONDARY_INDEX_ENTRY_OVERHEAD;
436          byte[] bytes = b.toBytes(keyOffset, keyLen);
437          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
438        } finally {
439          cachingBlockReader.returnBlock(midLeafBlock);
440        }
441      } else {
442        // The middle of the root-level index.
443        targetMidKey = blockKeys[rootCount / 2];
444      }
445
446      this.midKey.set(targetMidKey);
447      return targetMidKey;
448    }
449
450    @Override
451    protected void initialize(int numEntries) {
452      blockKeys = new Cell[numEntries];
453    }
454
455    /**
456     * Adds a new entry in the root block index. Only used when reading.
457     *
458     * @param key Last key in the block
459     * @param offset file offset where the block is stored
460     * @param dataSize the uncompressed data size
461     */
462    @Override
463    protected void add(final byte[] key, final long offset, final int dataSize) {
464      blockOffsets[rootCount] = offset;
465      // Create the blockKeys as Cells once when the reader is opened
466      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
467      blockDataSizes[rootCount] = dataSize;
468      rootCount++;
469    }
470
471    @Override
472    public int rootBlockContainingKey(final byte[] key, int offset, int length,
473        CellComparator comp) {
474      // This should always be called with Cell not with a byte[] key
475      throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
476          "array. Only cell based keys can be searched for");
477    }
478
479    @Override
480    public int rootBlockContainingKey(Cell key) {
481      // Here the comparator should not be null as this happens for the root-level block
482      int pos = Bytes.binarySearch(blockKeys, key, comparator);
483      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
484      // binarySearch's javadoc.
485
486      if (pos >= 0) {
487        // This means this is an exact match with an element of blockKeys.
488        assert pos < blockKeys.length;
489        return pos;
490      }
491
492      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
493      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
494      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
495      // key < blockKeys[0], meaning the file does not contain the given key.
496
497      int i = -pos - 1;
498      assert 0 <= i && i <= blockKeys.length;
499      return i - 1;
500    }
501
502    @Override
503    public String toString() {
504      StringBuilder sb = new StringBuilder();
505      sb.append("size=" + rootCount).append("\n");
506      for (int i = 0; i < rootCount; i++) {
507        sb.append("key=").append((blockKeys[i]))
508            .append("\n  offset=").append(blockOffsets[i])
509            .append(", dataSize=" + blockDataSizes[i]).append("\n");
510      }
511      return sb.toString();
512    }
513  }
514   /**
515   * The reader will always hold the root level index in the memory. Index
516   * blocks at all other levels will be cached in the LRU cache in practice,
517   * although this API does not enforce that.
518   *
519   * <p>All non-root (leaf and intermediate) index blocks contain what we call a
520   * "secondary index": an array of offsets to the entries within the block.
521   * This allows us to do binary search for the entry corresponding to the
522   * given key without having to deserialize the block.
523   */
524   static abstract class BlockIndexReader implements HeapSize {
525
526    protected long[] blockOffsets;
527    protected int[] blockDataSizes;
528    protected int rootCount = 0;
529
530    // Mid-key metadata.
531    protected long midLeafBlockOffset = -1;
532    protected int midLeafBlockOnDiskSize = -1;
533    protected int midKeyEntry = -1;
534
535    /**
536     * The number of levels in the block index tree. One if there is only root
537     * level, two for root and leaf levels, etc.
538     */
539    protected int searchTreeLevel;
540
541    /** A way to read {@link HFile} blocks at a given offset */
542    protected CachingBlockReader cachingBlockReader;
543
544    /**
545     * @return true if the block index is empty.
546     */
547    public abstract boolean isEmpty();
548
549    /**
550     * Verifies that the block index is non-empty and throws an
551     * {@link IllegalStateException} otherwise.
552     */
553    public void ensureNonEmpty() {
554      if (isEmpty()) {
555        throw new IllegalStateException("Block index is empty or not loaded");
556      }
557    }
558
559    /**
560     * Return the data block which contains this key. This function will only
561     * be called when the HFile version is larger than 1.
562     *
563     * @param key the key we are looking for
564     * @param currentBlock the current block, to avoid re-reading the same block
565     * @param cacheBlocks
566     * @param pread
567     * @param isCompaction
568     * @param expectedDataBlockEncoding the data block encoding the caller is
569     *          expecting the data block to be in, or null to not perform this
570     *          check and return the block irrespective of the encoding
571     * @return reader a basic way to load blocks
572     * @throws IOException
573     */
574    public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
575        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
576        throws IOException {
577      BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
578          cacheBlocks,
579          pread, isCompaction, expectedDataBlockEncoding);
580      if (blockWithScanInfo == null) {
581        return null;
582      } else {
583        return blockWithScanInfo.getHFileBlock();
584      }
585    }
586
587    /**
588     * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with
589     * other scan info such as the key that starts the next HFileBlock. This function will only
590     * be called when the HFile version is larger than 1.
591     *
592     * @param key the key we are looking for
593     * @param currentBlock the current block, to avoid re-reading the same block
594     * @param expectedDataBlockEncoding the data block encoding the caller is
595     *          expecting the data block to be in, or null to not perform this
596     *          check and return the block irrespective of the encoding.
597     * @return the BlockWithScanInfo which contains the DataBlock with other
598     *         scan info such as nextIndexedKey.
599     * @throws IOException
600     */
601    public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
602        boolean cacheBlocks,
603        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
604        throws IOException;
605
606    /**
607     * An approximation to the {@link HFile}'s mid-key. Operates on block
608     * boundaries, and does not go inside blocks. In other words, returns the
609     * first key of the middle block of the file.
610     *
611     * @return the first key of the middle block
612     */
613    public abstract Cell midkey() throws IOException;
614
615    /**
616     * @param i from 0 to {@link #getRootBlockCount() - 1}
617     */
618    public long getRootBlockOffset(int i) {
619      return blockOffsets[i];
620    }
621
622    /**
623     * @param i zero-based index of a root-level block
624     * @return the on-disk size of the root-level block for version 2, or the
625     *         uncompressed size for version 1
626     */
627    public int getRootBlockDataSize(int i) {
628      return blockDataSizes[i];
629    }
630
631    /**
632     * @return the number of root-level blocks in this block index
633     */
634    public int getRootBlockCount() {
635      return rootCount;
636    }
637
638    /**
639     * Finds the root-level index block containing the given key.
640     *
641     * @param key
642     *          Key to find
643     * @param comp
644     *          the comparator to be used
645     * @return Offset of block containing <code>key</code> (between 0 and the
646     *         number of blocks - 1) or -1 if this file does not contain the
647     *         request.
648     */
649    // When we want to find the meta index block or bloom block for ROW bloom
650    // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
651    // CellComparator.
652    public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
653        CellComparator comp);
654
655    /**
656     * Finds the root-level index block containing the given key.
657     *
658     * @param key
659     *          Key to find
660     * @return Offset of block containing <code>key</code> (between 0 and the
661     *         number of blocks - 1) or -1 if this file does not contain the
662     *         request.
663     */
664    // When we want to find the meta index block or bloom block for ROW bloom
665    // type
666    // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
667    // need the CellComparator.
668    public int rootBlockContainingKey(final byte[] key, int offset, int length) {
669      return rootBlockContainingKey(key, offset, length, null);
670    }
671
672    /**
673     * Finds the root-level index block containing the given key.
674     *
675     * @param key
676     *          Key to find
677     */
678    public abstract int rootBlockContainingKey(final Cell key);
679
680    /**
681     * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
682     * @param nonRootIndex
683     * @param i the ith position
684     * @return The indexed key at the ith position in the nonRootIndex.
685     */
686    protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
687      int numEntries = nonRootIndex.getInt(0);
688      if (i < 0 || i >= numEntries) {
689        return null;
690      }
691
692      // Entries start after the number of entries and the secondary index.
693      // The secondary index takes numEntries + 1 ints.
694      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
695      // Targetkey's offset relative to the end of secondary index
696      int targetKeyRelOffset = nonRootIndex.getInt(
697          Bytes.SIZEOF_INT * (i + 1));
698
699      // The offset of the target key in the blockIndex buffer
700      int targetKeyOffset = entriesOffset     // Skip secondary index
701          + targetKeyRelOffset               // Skip all entries until mid
702          + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
703
704      // We subtract the two consecutive secondary index elements, which
705      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
706      // then need to subtract the overhead of offset and onDiskSize.
707      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
708        targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
709
710      // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
711      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
712    }
713
714    /**
715     * Performs a binary search over a non-root level index block. Utilizes the
716     * secondary index, which records the offsets of (offset, onDiskSize,
717     * firstKey) tuples of all entries.
718     *
719     * @param key
720     *          the key we are searching for offsets to individual entries in
721     *          the blockIndex buffer
722     * @param nonRootIndex
723     *          the non-root index block buffer, starting with the secondary
724     *          index. The position is ignored.
725     * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
726     *         keys[i + 1], if keys is the array of all keys being searched, or
727     *         -1 otherwise
728     * @throws IOException
729     */
730    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
731        CellComparator comparator) {
732
733      int numEntries = nonRootIndex.getIntAfterPosition(0);
734      int low = 0;
735      int high = numEntries - 1;
736      int mid = 0;
737
738      // Entries start after the number of entries and the secondary index.
739      // The secondary index takes numEntries + 1 ints.
740      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
741
742      // If we imagine that keys[-1] = -Infinity and
743      // keys[numEntries] = Infinity, then we are maintaining an invariant that
744      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
745      ByteBufferKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferKeyOnlyKeyValue();
746      ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<>();
747      while (low <= high) {
748        mid = (low + high) >>> 1;
749
750        // Midkey's offset relative to the end of secondary index
751        int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
752
753        // The offset of the middle key in the blockIndex buffer
754        int midKeyOffset = entriesOffset       // Skip secondary index
755            + midKeyRelOffset                  // Skip all entries until mid
756            + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
757
758        // We subtract the two consecutive secondary index elements, which
759        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
760        // then need to subtract the overhead of offset and onDiskSize.
761        int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
762            midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
763
764        // we have to compare in this order, because the comparator order
765        // has special logic when the 'left side' is a special key.
766        // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
767        // done after HBASE-12224 & HBASE-12282
768        // TODO avoid array call.
769        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
770        nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
771        int cmp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, nonRootIndexkeyOnlyKV);
772
773        // key lives above the midpoint
774        if (cmp > 0)
775          low = mid + 1; // Maintain the invariant that keys[low - 1] < key
776        // key lives below the midpoint
777        else if (cmp < 0)
778          high = mid - 1; // Maintain the invariant that key < keys[high + 1]
779        else
780          return mid; // exact match
781      }
782
783      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
784      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
785      // condition, low >= high + 1. Therefore, low = high + 1.
786
787      if (low != high + 1) {
788        throw new IllegalStateException("Binary search broken: low=" + low
789            + " " + "instead of " + (high + 1));
790      }
791
792      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
793      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
794      int i = low - 1;
795
796      // Some extra validation on the result.
797      if (i < -1 || i >= numEntries) {
798        throw new IllegalStateException("Binary search broken: result is " +
799            i + " but expected to be between -1 and (numEntries - 1) = " +
800            (numEntries - 1));
801      }
802
803      return i;
804    }
805
806    /**
807     * Search for one key using the secondary index in a non-root block. In case
808     * of success, positions the provided buffer at the entry of interest, where
809     * the file offset and the on-disk-size can be read.
810     *
811     * @param nonRootBlock
812     *          a non-root block without header. Initial position does not
813     *          matter.
814     * @param key
815     *          the byte array containing the key
816     * @return the index position where the given key was found, otherwise
817     *         return -1 in the case the given key is before the first key.
818     *
819     */
820    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
821        CellComparator comparator) {
822      int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
823
824      if (entryIndex != -1) {
825        int numEntries = nonRootBlock.getIntAfterPosition(0);
826
827        // The end of secondary index and the beginning of entries themselves.
828        int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
829
830        // The offset of the entry we are interested in relative to the end of
831        // the secondary index.
832        int entryRelOffset = nonRootBlock
833            .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
834
835        nonRootBlock.position(entriesOffset + entryRelOffset);
836      }
837
838      return entryIndex;
839    }
840
841    /**
842     * Read in the root-level index from the given input stream. Must match
843     * what was written into the root level by
844     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
845     * offset that function returned.
846     *
847     * @param in the buffered input stream or wrapped byte input stream
848     * @param numEntries the number of root-level index entries
849     * @throws IOException
850     */
851    public void readRootIndex(DataInput in, final int numEntries) throws IOException {
852      blockOffsets = new long[numEntries];
853      initialize(numEntries);
854      blockDataSizes = new int[numEntries];
855
856      // If index size is zero, no index was written.
857      if (numEntries > 0) {
858        for (int i = 0; i < numEntries; ++i) {
859          long offset = in.readLong();
860          int dataSize = in.readInt();
861          byte[] key = Bytes.readByteArray(in);
862          add(key, offset, dataSize);
863        }
864      }
865    }
866
867    protected abstract void initialize(int numEntries);
868
869    protected abstract void add(final byte[] key, final long offset, final int dataSize);
870
871    /**
872     * Read in the root-level index from the given input stream. Must match
873     * what was written into the root level by
874     * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
875     * offset that function returned.
876     *
877     * @param blk the HFile block
878     * @param numEntries the number of root-level index entries
879     * @return the buffered input stream or wrapped byte input stream
880     * @throws IOException
881     */
882    public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
883      DataInputStream in = blk.getByteStream();
884      readRootIndex(in, numEntries);
885      return in;
886    }
887
888    /**
889     * Read the root-level metadata of a multi-level block index. Based on
890     * {@link #readRootIndex(DataInput, int)}, but also reads metadata
891     * necessary to compute the mid-key in a multi-level index.
892     *
893     * @param blk the HFile block
894     * @param numEntries the number of root-level index entries
895     * @throws IOException
896     */
897    public void readMultiLevelIndexRoot(HFileBlock blk,
898        final int numEntries) throws IOException {
899      DataInputStream in = readRootIndex(blk, numEntries);
900      // after reading the root index the checksum bytes have to
901      // be subtracted to know if the mid key exists.
902      int checkSumBytes = blk.totalChecksumBytes();
903      if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
904        // No mid-key metadata available.
905        return;
906      }
907      midLeafBlockOffset = in.readLong();
908      midLeafBlockOnDiskSize = in.readInt();
909      midKeyEntry = in.readInt();
910    }
911
912    @Override
913    public long heapSize() {
914      // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
915      long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
916          2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
917
918      // Mid-key metadata.
919      heapSize += MID_KEY_METADATA_SIZE;
920
921      heapSize = calculateHeapSizeForBlockKeys(heapSize);
922
923      if (blockOffsets != null) {
924        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
925            * Bytes.SIZEOF_LONG);
926      }
927
928      if (blockDataSizes != null) {
929        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
930            * Bytes.SIZEOF_INT);
931      }
932
933      return ClassSize.align(heapSize);
934    }
935
936    protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
937  }
938
939  /**
940   * Writes the block index into the output stream. Generate the tree from
941   * bottom up. The leaf level is written to disk as a sequence of inline
942   * blocks, if it is larger than a certain number of bytes. If the leaf level
943   * is not large enough, we write all entries to the root level instead.
944   *
945   * After all leaf blocks have been written, we end up with an index
946   * referencing the resulting leaf index blocks. If that index is larger than
947   * the allowed root index size, the writer will break it up into
948   * reasonable-size intermediate-level index block chunks write those chunks
949   * out, and create another index referencing those chunks. This will be
950   * repeated until the remaining index is small enough to become the root
951   * index. However, in most practical cases we will only have leaf-level
952   * blocks and the root index, or just the root index.
953   */
954  public static class BlockIndexWriter implements InlineBlockWriter {
955    /**
956     * While the index is being written, this represents the current block
957     * index referencing all leaf blocks, with one exception. If the file is
958     * being closed and there are not enough blocks to complete even a single
959     * leaf block, no leaf blocks get written and this contains the entire
960     * block index. After all levels of the index were written by
961     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
962     * root-level index.
963     */
964    private BlockIndexChunk rootChunk = new BlockIndexChunk();
965
966    /**
967     * Current leaf-level chunk. New entries referencing data blocks get added
968     * to this chunk until it grows large enough to be written to disk.
969     */
970    private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
971
972    /**
973     * The number of block index levels. This is one if there is only root
974     * level (even empty), two if there a leaf level and root level, and is
975     * higher if there are intermediate levels. This is only final after
976     * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
977     * initial value accounts for the root level, and will be increased to two
978     * as soon as we find out there is a leaf-level in
979     * {@link #blockWritten(long, int, int)}.
980     */
981    private int numLevels = 1;
982
983    private HFileBlock.Writer blockWriter;
984    private byte[] firstKey = null;
985
986    /**
987     * The total number of leaf-level entries, i.e. entries referenced by
988     * leaf-level blocks. For the data block index this is equal to the number
989     * of data blocks.
990     */
991    private long totalNumEntries;
992
993    /** Total compressed size of all index blocks. */
994    private long totalBlockOnDiskSize;
995
996    /** Total uncompressed size of all index blocks. */
997    private long totalBlockUncompressedSize;
998
999    /** The maximum size guideline of all multi-level index blocks. */
1000    private int maxChunkSize;
1001
1002    /** The maximum level of multi-level index blocks */
1003    private int minIndexNumEntries;
1004
1005    /** Whether we require this block index to always be single-level. */
1006    private boolean singleLevelOnly;
1007
1008    /** CacheConfig, or null if cache-on-write is disabled */
1009    private CacheConfig cacheConf;
1010
1011    /** Name to use for computing cache keys */
1012    private String nameForCaching;
1013
1014    /** Creates a single-level block index writer */
1015    public BlockIndexWriter() {
1016      this(null, null, null);
1017      singleLevelOnly = true;
1018    }
1019
1020    /**
1021     * Creates a multi-level block index writer.
1022     *
1023     * @param blockWriter the block writer to use to write index blocks
1024     * @param cacheConf used to determine when and how a block should be cached-on-write.
1025     */
1026    public BlockIndexWriter(HFileBlock.Writer blockWriter,
1027        CacheConfig cacheConf, String nameForCaching) {
1028      if ((cacheConf == null) != (nameForCaching == null)) {
1029        throw new IllegalArgumentException("Block cache and file name for " +
1030            "caching must be both specified or both null");
1031      }
1032
1033      this.blockWriter = blockWriter;
1034      this.cacheConf = cacheConf;
1035      this.nameForCaching = nameForCaching;
1036      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1037      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1038    }
1039
1040    public void setMaxChunkSize(int maxChunkSize) {
1041      if (maxChunkSize <= 0) {
1042        throw new IllegalArgumentException("Invalid maximum index block size");
1043      }
1044      this.maxChunkSize = maxChunkSize;
1045    }
1046
1047    public void setMinIndexNumEntries(int minIndexNumEntries) {
1048      if (minIndexNumEntries <= 1) {
1049        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1050      }
1051      this.minIndexNumEntries = minIndexNumEntries;
1052    }
1053
1054    /**
1055     * Writes the root level and intermediate levels of the block index into
1056     * the output stream, generating the tree from bottom up. Assumes that the
1057     * leaf level has been inline-written to the disk if there is enough data
1058     * for more than one leaf block. We iterate by breaking the current level
1059     * of the block index, starting with the index of all leaf-level blocks,
1060     * into chunks small enough to be written to disk, and generate its parent
1061     * level, until we end up with a level small enough to become the root
1062     * level.
1063     *
1064     * If the leaf level is not large enough, there is no inline block index
1065     * anymore, so we only write that level of block index to disk as the root
1066     * level.
1067     *
1068     * @param out FSDataOutputStream
1069     * @return position at which we entered the root-level index.
1070     * @throws IOException
1071     */
1072    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1073      if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1074        throw new IOException("Trying to write a multi-level block index, " +
1075            "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1076            "last inline chunk.");
1077      }
1078
1079      // We need to get mid-key metadata before we create intermediate
1080      // indexes and overwrite the root chunk.
1081      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1082          : null;
1083
1084      if (curInlineChunk != null) {
1085        while (rootChunk.getRootSize() > maxChunkSize
1086            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1087            && rootChunk.getNumEntries() > minIndexNumEntries
1088            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1089            && numLevels < 16) {
1090          rootChunk = writeIntermediateLevel(out, rootChunk);
1091          numLevels += 1;
1092        }
1093      }
1094
1095      // write the root level
1096      long rootLevelIndexPos = out.getPos();
1097
1098      {
1099        DataOutput blockStream =
1100            blockWriter.startWriting(BlockType.ROOT_INDEX);
1101        rootChunk.writeRoot(blockStream);
1102        if (midKeyMetadata != null)
1103          blockStream.write(midKeyMetadata);
1104        blockWriter.writeHeaderAndData(out);
1105        if (cacheConf != null) {
1106          HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1107          cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1108            rootLevelIndexPos, true, blockForCaching.getBlockType()), blockForCaching);
1109        }
1110      }
1111
1112      // Add root index block size
1113      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1114      totalBlockUncompressedSize +=
1115          blockWriter.getUncompressedSizeWithoutHeader();
1116
1117      if (LOG.isTraceEnabled()) {
1118        LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1119          + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1120          + " root-level entries, " + totalNumEntries + " total entries, "
1121          + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1122          " on-disk size, "
1123          + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1124          " total uncompressed size.");
1125      }
1126      return rootLevelIndexPos;
1127    }
1128
1129    /**
1130     * Writes the block index data as a single level only. Does not do any
1131     * block framing.
1132     *
1133     * @param out the buffered output stream to write the index to. Typically a
1134     *          stream writing into an {@link HFile} block.
1135     * @param description a short description of the index being written. Used
1136     *          in a log message.
1137     * @throws IOException
1138     */
1139    public void writeSingleLevelIndex(DataOutput out, String description)
1140        throws IOException {
1141      expectNumLevels(1);
1142
1143      if (!singleLevelOnly)
1144        throw new IOException("Single-level mode is turned off");
1145
1146      if (rootChunk.getNumEntries() > 0)
1147        throw new IOException("Root-level entries already added in " +
1148            "single-level mode");
1149
1150      rootChunk = curInlineChunk;
1151      curInlineChunk = new BlockIndexChunk();
1152
1153      if (LOG.isTraceEnabled()) {
1154        LOG.trace("Wrote a single-level " + description + " index with "
1155          + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1156          + " bytes");
1157      }
1158      rootChunk.writeRoot(out);
1159    }
1160
1161    /**
1162     * Split the current level of the block index into intermediate index
1163     * blocks of permitted size and write those blocks to disk. Return the next
1164     * level of the block index referencing those intermediate-level blocks.
1165     *
1166     * @param out
1167     * @param currentLevel the current level of the block index, such as the a
1168     *          chunk referencing all leaf-level index blocks
1169     * @return the parent level block index, which becomes the root index after
1170     *         a few (usually zero) iterations
1171     * @throws IOException
1172     */
1173    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1174        BlockIndexChunk currentLevel) throws IOException {
1175      // Entries referencing intermediate-level blocks we are about to create.
1176      BlockIndexChunk parent = new BlockIndexChunk();
1177
1178      // The current intermediate-level block index chunk.
1179      BlockIndexChunk curChunk = new BlockIndexChunk();
1180
1181      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1182        curChunk.add(currentLevel.getBlockKey(i),
1183            currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1184
1185        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1186        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1187        // first key is larger than maxChunkSize this will cause infinite recursion.
1188        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1189          writeIntermediateBlock(out, parent, curChunk);
1190        }
1191      }
1192
1193      if (curChunk.getNumEntries() > 0) {
1194        writeIntermediateBlock(out, parent, curChunk);
1195      }
1196
1197      return parent;
1198    }
1199
1200    private void writeIntermediateBlock(FSDataOutputStream out,
1201        BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1202      long beginOffset = out.getPos();
1203      DataOutputStream dos = blockWriter.startWriting(
1204          BlockType.INTERMEDIATE_INDEX);
1205      curChunk.writeNonRoot(dos);
1206      byte[] curFirstKey = curChunk.getBlockKey(0);
1207      blockWriter.writeHeaderAndData(out);
1208
1209      if (getCacheOnWrite()) {
1210        HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1211        cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1212          beginOffset, true, blockForCaching.getBlockType()), blockForCaching);
1213      }
1214
1215      // Add intermediate index block size
1216      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1217      totalBlockUncompressedSize +=
1218          blockWriter.getUncompressedSizeWithoutHeader();
1219
1220      // OFFSET is the beginning offset the chunk of block index entries.
1221      // SIZE is the total byte size of the chunk of block index entries
1222      // + the secondary index size
1223      // FIRST_KEY is the first key in the chunk of block index
1224      // entries.
1225      parent.add(curFirstKey, beginOffset,
1226          blockWriter.getOnDiskSizeWithHeader());
1227
1228      // clear current block index chunk
1229      curChunk.clear();
1230      curFirstKey = null;
1231    }
1232
1233    /**
1234     * @return how many block index entries there are in the root level
1235     */
1236    public final int getNumRootEntries() {
1237      return rootChunk.getNumEntries();
1238    }
1239
1240    /**
1241     * @return the number of levels in this block index.
1242     */
1243    public int getNumLevels() {
1244      return numLevels;
1245    }
1246
1247    private void expectNumLevels(int expectedNumLevels) {
1248      if (numLevels != expectedNumLevels) {
1249        throw new IllegalStateException("Number of block index levels is "
1250            + numLevels + "but is expected to be " + expectedNumLevels);
1251      }
1252    }
1253
1254    /**
1255     * Whether there is an inline block ready to be written. In general, we
1256     * write an leaf-level index block as an inline block as soon as its size
1257     * as serialized in the non-root format reaches a certain threshold.
1258     */
1259    @Override
1260    public boolean shouldWriteBlock(boolean closing) {
1261      if (singleLevelOnly) {
1262        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1263      }
1264
1265      if (curInlineChunk == null) {
1266        throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1267            "called with closing=true and then called again?");
1268      }
1269
1270      if (curInlineChunk.getNumEntries() == 0) {
1271        return false;
1272      }
1273
1274      // We do have some entries in the current inline chunk.
1275      if (closing) {
1276        if (rootChunk.getNumEntries() == 0) {
1277          // We did not add any leaf-level blocks yet. Instead of creating a
1278          // leaf level with one block, move these entries to the root level.
1279
1280          expectNumLevels(1);
1281          rootChunk = curInlineChunk;
1282          curInlineChunk = null;  // Disallow adding any more index entries.
1283          return false;
1284        }
1285
1286        return true;
1287      } else {
1288        return curInlineChunk.getNonRootSize() >= maxChunkSize;
1289      }
1290    }
1291
1292    /**
1293     * Write out the current inline index block. Inline blocks are non-root
1294     * blocks, so the non-root index format is used.
1295     *
1296     * @param out
1297     */
1298    @Override
1299    public void writeInlineBlock(DataOutput out) throws IOException {
1300      if (singleLevelOnly)
1301        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1302
1303      // Write the inline block index to the output stream in the non-root
1304      // index block format.
1305      curInlineChunk.writeNonRoot(out);
1306
1307      // Save the first key of the inline block so that we can add it to the
1308      // parent-level index.
1309      firstKey = curInlineChunk.getBlockKey(0);
1310
1311      // Start a new inline index block
1312      curInlineChunk.clear();
1313    }
1314
1315    /**
1316     * Called after an inline block has been written so that we can add an
1317     * entry referring to that block to the parent-level index.
1318     */
1319    @Override
1320    public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1321      // Add leaf index block size
1322      totalBlockOnDiskSize += onDiskSize;
1323      totalBlockUncompressedSize += uncompressedSize;
1324
1325      if (singleLevelOnly)
1326        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1327
1328      if (firstKey == null) {
1329        throw new IllegalStateException("Trying to add second-level index " +
1330            "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1331            "but the first key was not set in writeInlineBlock");
1332      }
1333
1334      if (rootChunk.getNumEntries() == 0) {
1335        // We are writing the first leaf block, so increase index level.
1336        expectNumLevels(1);
1337        numLevels = 2;
1338      }
1339
1340      // Add another entry to the second-level index. Include the number of
1341      // entries in all previous leaf-level chunks for mid-key calculation.
1342      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1343      firstKey = null;
1344    }
1345
1346    @Override
1347    public BlockType getInlineBlockType() {
1348      return BlockType.LEAF_INDEX;
1349    }
1350
1351    /**
1352     * Add one index entry to the current leaf-level block. When the leaf-level
1353     * block gets large enough, it will be flushed to disk as an inline block.
1354     *
1355     * @param firstKey the first key of the data block
1356     * @param blockOffset the offset of the data block
1357     * @param blockDataSize the on-disk size of the data block ({@link HFile}
1358     *          format version 2), or the uncompressed size of the data block (
1359     *          {@link HFile} format version 1).
1360     */
1361    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1362      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1363      ++totalNumEntries;
1364    }
1365
1366    /**
1367     * @throws IOException if we happened to write a multi-level index.
1368     */
1369    public void ensureSingleLevel() throws IOException {
1370      if (numLevels > 1) {
1371        throw new IOException ("Wrote a " + numLevels + "-level index with " +
1372            rootChunk.getNumEntries() + " root-level entries, but " +
1373            "this is expected to be a single-level block index.");
1374      }
1375    }
1376
1377    /**
1378     * @return true if we are using cache-on-write. This is configured by the
1379     *         caller of the constructor by either passing a valid block cache
1380     *         or null.
1381     */
1382    @Override
1383    public boolean getCacheOnWrite() {
1384      return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1385    }
1386
1387    /**
1388     * The total uncompressed size of the root index block, intermediate-level
1389     * index blocks, and leaf-level index blocks.
1390     *
1391     * @return the total uncompressed size of all index blocks
1392     */
1393    public long getTotalUncompressedSize() {
1394      return totalBlockUncompressedSize;
1395    }
1396
1397  }
1398
1399  /**
1400   * A single chunk of the block index in the process of writing. The data in
1401   * this chunk can become a leaf-level, intermediate-level, or root index
1402   * block.
1403   */
1404  static class BlockIndexChunk {
1405
1406    /** First keys of the key range corresponding to each index entry. */
1407    private final List<byte[]> blockKeys = new ArrayList<>();
1408
1409    /** Block offset in backing stream. */
1410    private final List<Long> blockOffsets = new ArrayList<>();
1411
1412    /** On-disk data sizes of lower-level data or index blocks. */
1413    private final List<Integer> onDiskDataSizes = new ArrayList<>();
1414
1415    /**
1416     * The cumulative number of sub-entries, i.e. entries on deeper-level block
1417     * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1418     * blocks corresponding to this chunk's entries #0 through #i inclusively.
1419     */
1420    private final List<Long> numSubEntriesAt = new ArrayList<>();
1421
1422    /**
1423     * The offset of the next entry to be added, relative to the end of the
1424     * "secondary index" in the "non-root" format representation of this index
1425     * chunk. This is the next value to be added to the secondary index.
1426     */
1427    private int curTotalNonRootEntrySize = 0;
1428
1429    /**
1430     * The accumulated size of this chunk if stored in the root index format.
1431     */
1432    private int curTotalRootSize = 0;
1433
1434    /**
1435     * The "secondary index" used for binary search over variable-length
1436     * records in a "non-root" format block. These offsets are relative to the
1437     * end of this secondary index.
1438     */
1439    private final List<Integer> secondaryIndexOffsetMarks = new ArrayList<>();
1440
1441    /**
1442     * Adds a new entry to this block index chunk.
1443     *
1444     * @param firstKey the first key in the block pointed to by this entry
1445     * @param blockOffset the offset of the next-level block pointed to by this
1446     *          entry
1447     * @param onDiskDataSize the on-disk data of the block pointed to by this
1448     *          entry, including header size
1449     * @param curTotalNumSubEntries if this chunk is the root index chunk under
1450     *          construction, this specifies the current total number of
1451     *          sub-entries in all leaf-level chunks, including the one
1452     *          corresponding to the second-level entry being added.
1453     */
1454    void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1455        long curTotalNumSubEntries) {
1456      // Record the offset for the secondary index
1457      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1458      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1459          + firstKey.length;
1460
1461      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1462          + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1463
1464      blockKeys.add(firstKey);
1465      blockOffsets.add(blockOffset);
1466      onDiskDataSizes.add(onDiskDataSize);
1467
1468      if (curTotalNumSubEntries != -1) {
1469        numSubEntriesAt.add(curTotalNumSubEntries);
1470
1471        // Make sure the parallel arrays are in sync.
1472        if (numSubEntriesAt.size() != blockKeys.size()) {
1473          throw new IllegalStateException("Only have key/value count " +
1474              "stats for " + numSubEntriesAt.size() + " block index " +
1475              "entries out of " + blockKeys.size());
1476        }
1477      }
1478    }
1479
1480    /**
1481     * The same as {@link #add(byte[], long, int, long)} but does not take the
1482     * key/value into account. Used for single-level indexes.
1483     *
1484     * @see #add(byte[], long, int, long)
1485     */
1486    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1487      add(firstKey, blockOffset, onDiskDataSize, -1);
1488    }
1489
1490    public void clear() {
1491      blockKeys.clear();
1492      blockOffsets.clear();
1493      onDiskDataSizes.clear();
1494      secondaryIndexOffsetMarks.clear();
1495      numSubEntriesAt.clear();
1496      curTotalNonRootEntrySize = 0;
1497      curTotalRootSize = 0;
1498    }
1499
1500    /**
1501     * Finds the entry corresponding to the deeper-level index block containing
1502     * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1503     * ordering of sub-entries.
1504     *
1505     * <p>
1506     * <i> Implementation note. </i> We are looking for i such that
1507     * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1508     * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1509     * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1510     * sub-entries. i is by definition the insertion point of k in
1511     * numSubEntriesAt.
1512     *
1513     * @param k sub-entry index, from 0 to the total number sub-entries - 1
1514     * @return the 0-based index of the entry corresponding to the given
1515     *         sub-entry
1516     */
1517    public int getEntryBySubEntry(long k) {
1518      // We define mid-key as the key corresponding to k'th sub-entry
1519      // (0-based).
1520
1521      int i = Collections.binarySearch(numSubEntriesAt, k);
1522
1523      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1524      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1525      // is in the (i + 1)'th chunk.
1526      if (i >= 0)
1527        return i + 1;
1528
1529      // Inexact match. Return the insertion point.
1530      return -i - 1;
1531    }
1532
1533    /**
1534     * Used when writing the root block index of a multi-level block index.
1535     * Serializes additional information allowing to efficiently identify the
1536     * mid-key.
1537     *
1538     * @return a few serialized fields for finding the mid-key
1539     * @throws IOException if could not create metadata for computing mid-key
1540     */
1541    public byte[] getMidKeyMetadata() throws IOException {
1542      ByteArrayOutputStream baos = new ByteArrayOutputStream(
1543          MID_KEY_METADATA_SIZE);
1544      DataOutputStream baosDos = new DataOutputStream(baos);
1545      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1546      if (totalNumSubEntries == 0) {
1547        throw new IOException("No leaf-level entries, mid-key unavailable");
1548      }
1549      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1550      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1551
1552      baosDos.writeLong(blockOffsets.get(midKeyEntry));
1553      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1554
1555      long numSubEntriesBefore = midKeyEntry > 0
1556          ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1557      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1558      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1559      {
1560        throw new IOException("Could not identify mid-key index within the "
1561            + "leaf-level block containing mid-key: out of range ("
1562            + subEntryWithinEntry + ", numSubEntriesBefore="
1563            + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1564            + ")");
1565      }
1566
1567      baosDos.writeInt((int) subEntryWithinEntry);
1568
1569      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1570        throw new IOException("Could not write mid-key metadata: size=" +
1571            baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1572      }
1573
1574      // Close just to be good citizens, although this has no effect.
1575      baos.close();
1576
1577      return baos.toByteArray();
1578    }
1579
1580    /**
1581     * Writes the block index chunk in the non-root index block format. This
1582     * format contains the number of entries, an index of integer offsets
1583     * for quick binary search on variable-length records, and tuples of
1584     * block offset, on-disk block size, and the first key for each entry.
1585     *
1586     * @param out
1587     * @throws IOException
1588     */
1589    void writeNonRoot(DataOutput out) throws IOException {
1590      // The number of entries in the block.
1591      out.writeInt(blockKeys.size());
1592
1593      if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1594        throw new IOException("Corrupted block index chunk writer: " +
1595            blockKeys.size() + " entries but " +
1596            secondaryIndexOffsetMarks.size() + " secondary index items");
1597      }
1598
1599      // For each entry, write a "secondary index" of relative offsets to the
1600      // entries from the end of the secondary index. This works, because at
1601      // read time we read the number of entries and know where the secondary
1602      // index ends.
1603      for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1604        out.writeInt(currentSecondaryIndex);
1605
1606      // We include one other element in the secondary index to calculate the
1607      // size of each entry more easily by subtracting secondary index elements.
1608      out.writeInt(curTotalNonRootEntrySize);
1609
1610      for (int i = 0; i < blockKeys.size(); ++i) {
1611        out.writeLong(blockOffsets.get(i));
1612        out.writeInt(onDiskDataSizes.get(i));
1613        out.write(blockKeys.get(i));
1614      }
1615    }
1616
1617    /**
1618     * @return the size of this chunk if stored in the non-root index block
1619     *         format
1620     */
1621    int getNonRootSize() {
1622      return Bytes.SIZEOF_INT                          // Number of entries
1623          + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1624          + curTotalNonRootEntrySize;                  // All entries
1625    }
1626
1627    /**
1628     * Writes this chunk into the given output stream in the root block index
1629     * format. This format is similar to the {@link HFile} version 1 block
1630     * index format, except that we store on-disk size of the block instead of
1631     * its uncompressed size.
1632     *
1633     * @param out the data output stream to write the block index to. Typically
1634     *          a stream writing into an {@link HFile} block.
1635     * @throws IOException
1636     */
1637    void writeRoot(DataOutput out) throws IOException {
1638      for (int i = 0; i < blockKeys.size(); ++i) {
1639        out.writeLong(blockOffsets.get(i));
1640        out.writeInt(onDiskDataSizes.get(i));
1641        Bytes.writeByteArray(out, blockKeys.get(i));
1642      }
1643    }
1644
1645    /**
1646     * @return the size of this chunk if stored in the root index block format
1647     */
1648    int getRootSize() {
1649      return curTotalRootSize;
1650    }
1651
1652    /**
1653     * @return the number of entries in this block index chunk
1654     */
1655    public int getNumEntries() {
1656      return blockKeys.size();
1657    }
1658
1659    public byte[] getBlockKey(int i) {
1660      return blockKeys.get(i);
1661    }
1662
1663    public long getBlockOffset(int i) {
1664      return blockOffsets.get(i);
1665    }
1666
1667    public int getOnDiskDataSize(int i) {
1668      return onDiskDataSizes.get(i);
1669    }
1670
1671    public long getCumulativeNumKV(int i) {
1672      if (i < 0)
1673        return 0;
1674      return numSubEntriesAt.get(i);
1675    }
1676
1677  }
1678
1679  public static int getMaxChunkSize(Configuration conf) {
1680    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1681  }
1682
1683  public static int getMinIndexNumEntries(Configuration conf) {
1684    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1685  }
1686}