View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
45  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
46  import org.apache.hadoop.hbase.util.ByteBufferUtils;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.io.WritableUtils;
50  import org.apache.hadoop.util.StringUtils;
51  
52  /**
53   * Provides functionality to write ({@link BlockIndexWriter}) and read
54   * BlockIndexReader
55   * single-level and multi-level block indexes.
56   *
57   * Examples of how to use the block index writer can be found in
58   * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
59   *  {@link HFileWriterV2}. Examples of how to use the reader can be
60   *  found in {@link HFileReaderV2} and
61   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
62   */
63  @InterfaceAudience.Private
64  public class HFileBlockIndex {
65  
66    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
67  
68    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
69  
70    /**
71     * The maximum size guideline for index blocks (both leaf, intermediate, and
72     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
73     */
74    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
75  
76    /**
77     * Minimum number of entries in a single index block. Even if we are above the
78     * hfile.index.block.max.size we will keep writing to the same block unless we have that many
79     * entries. We should have at least a few entries so that we don't have too many levels in the
80     * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
81     */
82    public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
83  
84    static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
85  
86    /**
87     * The number of bytes stored in each "secondary index" entry in addition to
88     * key bytes in the non-root index block format. The first long is the file
89     * offset of the deeper-level block the entry points to, and the int that
90     * follows is that block's on-disk size without including header.
91     */
92    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
93        + Bytes.SIZEOF_LONG;
94  
95    /**
96     * Error message when trying to use inline block API in single-level mode.
97     */
98    private static final String INLINE_BLOCKS_NOT_ALLOWED =
99        "Inline blocks are not allowed in the single-level-only mode";
100 
101   /**
102    * The size of a meta-data record used for finding the mid-key in a
103    * multi-level index. Consists of the middle leaf-level index block offset
104    * (long), its on-disk size without header included (int), and the mid-key
105    * entry's zero-based index in that leaf index block.
106    */
107   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
108       2 * Bytes.SIZEOF_INT;
109 
110   /**
111    * The reader will always hold the root level index in the memory. Index
112    * blocks at all other levels will be cached in the LRU cache in practice,
113    * although this API does not enforce that.
114    *
115    * All non-root (leaf and intermediate) index blocks contain what we call a
116    * "secondary index": an array of offsets to the entries within the block.
117    * This allows us to do binary search for the entry corresponding to the
118    * given key without having to deserialize the block.
119    */
120   public static class BlockIndexReader implements HeapSize {
121     /** Needed doing lookup on blocks. */
122     private final KVComparator comparator;
123 
124     // Root-level data.
125     private byte[][] blockKeys;
126     private long[] blockOffsets;
127     private int[] blockDataSizes;
128     private int rootCount = 0;
129 
130     // Mid-key metadata.
131     private long midLeafBlockOffset = -1;
132     private int midLeafBlockOnDiskSize = -1;
133     private int midKeyEntry = -1;
134 
135     /** Pre-computed mid-key */
136     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
137 
138     /**
139      * The number of levels in the block index tree. One if there is only root
140      * level, two for root and leaf levels, etc.
141      */
142     private int searchTreeLevel;
143 
144     /** A way to read {@link HFile} blocks at a given offset */
145     private CachingBlockReader cachingBlockReader;
146 
147     public BlockIndexReader(final KVComparator c, final int treeLevel,
148         final CachingBlockReader cachingBlockReader) {
149       this(c, treeLevel);
150       this.cachingBlockReader = cachingBlockReader;
151     }
152 
153     public BlockIndexReader(final KVComparator c, final int treeLevel)
154     {
155       comparator = c;
156       searchTreeLevel = treeLevel;
157     }
158 
159     /**
160      * @return true if the block index is empty.
161      */
162     public boolean isEmpty() {
163       return blockKeys.length == 0;
164     }
165 
166     /**
167      * Verifies that the block index is non-empty and throws an
168      * {@link IllegalStateException} otherwise.
169      */
170     public void ensureNonEmpty() {
171       if (blockKeys.length == 0) {
172         throw new IllegalStateException("Block index is empty or not loaded");
173       }
174     }
175 
176     /**
177      * Return the data block which contains this key. This function will only
178      * be called when the HFile version is larger than 1.
179      *
180      * @param key the key we are looking for
181      * @param currentBlock the current block, to avoid re-reading the same block
182      * @param cacheBlocks
183      * @param pread
184      * @param isCompaction
185      * @param expectedDataBlockEncoding the data block encoding the caller is
186      *          expecting the data block to be in, or null to not perform this
187      *          check and return the block irrespective of the encoding
188      * @return reader a basic way to load blocks
189      * @throws IOException
190      */
191     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
192         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
193         throws IOException {
194       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
195           cacheBlocks,
196           pread, isCompaction, expectedDataBlockEncoding);
197       if (blockWithScanInfo == null) {
198         return null;
199       } else {
200         return blockWithScanInfo.getHFileBlock();
201       }
202     }
203 
204     /**
205      * Return the BlockWithScanInfo which contains the DataBlock with other scan
206      * info such as nextIndexedKey. This function will only be called when the
207      * HFile version is larger than 1.
208      *
209      * @param key
210      *          the key we are looking for
211      * @param currentBlock
212      *          the current block, to avoid re-reading the same block
213      * @param cacheBlocks
214      * @param pread
215      * @param isCompaction
216      * @param expectedDataBlockEncoding the data block encoding the caller is
217      *          expecting the data block to be in, or null to not perform this
218      *          check and return the block irrespective of the encoding.
219      * @return the BlockWithScanInfo which contains the DataBlock with other
220      *         scan info such as nextIndexedKey.
221      * @throws IOException
222      */
223     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
224         boolean cacheBlocks,
225         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
226         throws IOException {
227       int rootLevelIndex = rootBlockContainingKey(key);
228       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
229         return null;
230       }
231 
232       // the next indexed key
233       Cell nextIndexedKey = null;
234 
235       // Read the next-level (intermediate or leaf) index block.
236       long currentOffset = blockOffsets[rootLevelIndex];
237       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
238 
239       if (rootLevelIndex < blockKeys.length - 1) {
240         nextIndexedKey = new KeyValue.KeyOnlyKeyValue(blockKeys[rootLevelIndex + 1]);
241       } else {
242         nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
243       }
244 
245       int lookupLevel = 1; // How many levels deep we are in our lookup.
246       int index = -1;
247 
248       HFileBlock block;
249       while (true) {
250 
251         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
252         {
253           // Avoid reading the same block again, even with caching turned off.
254           // This is crucial for compaction-type workload which might have
255           // caching turned off. This is like a one-block cache inside the
256           // scanner.
257           block = currentBlock;
258         } else {
259           // Call HFile's caching block reader API. We always cache index
260           // blocks, otherwise we might get terrible performance.
261           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
262           BlockType expectedBlockType;
263           if (lookupLevel < searchTreeLevel - 1) {
264             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
265           } else if (lookupLevel == searchTreeLevel - 1) {
266             expectedBlockType = BlockType.LEAF_INDEX;
267           } else {
268             // this also accounts for ENCODED_DATA
269             expectedBlockType = BlockType.DATA;
270           }
271           block = cachingBlockReader.readBlock(currentOffset,
272           currentOnDiskSize, shouldCache, pread, isCompaction, true,
273               expectedBlockType, expectedDataBlockEncoding);
274         }
275 
276         if (block == null) {
277           throw new IOException("Failed to read block at offset " +
278               currentOffset + ", onDiskSize=" + currentOnDiskSize);
279         }
280 
281         // Found a data block, break the loop and check our level in the tree.
282         if (block.getBlockType().isData()) {
283           break;
284         }
285 
286         // Not a data block. This must be a leaf-level or intermediate-level
287         // index block. We don't allow going deeper than searchTreeLevel.
288         if (++lookupLevel > searchTreeLevel) {
289           throw new IOException("Search Tree Level overflow: lookupLevel="+
290               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
291         }
292 
293         // Locate the entry corresponding to the given key in the non-root
294         // (leaf or intermediate-level) index block.
295         ByteBuffer buffer = block.getBufferWithoutHeader();
296         index = locateNonRootIndexEntry(buffer, key, comparator);
297         if (index == -1) {
298           // This has to be changed
299           // For now change this to key value
300           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
301           throw new IOException("The key "
302               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
303               + " is before the" + " first key of the non-root index block "
304               + block);
305         }
306 
307         currentOffset = buffer.getLong();
308         currentOnDiskSize = buffer.getInt();
309 
310         // Only update next indexed key if there is a next indexed key in the current level
311         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
312         if (tmpNextIndexedKey != null) {
313           nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey);
314         }
315       }
316 
317       if (lookupLevel != searchTreeLevel) {
318         throw new IOException("Reached a data block at level " + lookupLevel +
319             " but the number of levels is " + searchTreeLevel);
320       }
321 
322       // set the next indexed key for the current block.
323       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
324       return blockWithScanInfo;
325     }
326 
327     /**
328      * An approximation to the {@link HFile}'s mid-key. Operates on block
329      * boundaries, and does not go inside blocks. In other words, returns the
330      * first key of the middle block of the file.
331      *
332      * @return the first key of the middle block
333      */
334     public byte[] midkey() throws IOException {
335       if (rootCount == 0)
336         throw new IOException("HFile empty");
337 
338       byte[] targetMidKey = this.midKey.get();
339       if (targetMidKey != null) {
340         return targetMidKey;
341       }
342 
343       if (midLeafBlockOffset >= 0) {
344         if (cachingBlockReader == null) {
345           throw new IOException("Have to read the middle leaf block but " +
346               "no block reader available");
347         }
348 
349         // Caching, using pread, assuming this is not a compaction.
350         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
351             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
352             BlockType.LEAF_INDEX, null);
353 
354         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
355         int numDataBlocks = b.getInt();
356         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
357         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
358             keyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
359         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
360             + SECONDARY_INDEX_ENTRY_OVERHEAD;
361         targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
362       } else {
363         // The middle of the root-level index.
364         targetMidKey = blockKeys[rootCount / 2];
365       }
366 
367       this.midKey.set(targetMidKey);
368       return targetMidKey;
369     }
370 
371     /**
372      * @param i from 0 to {@link #getRootBlockCount() - 1}
373      */
374     public byte[] getRootBlockKey(int i) {
375       return blockKeys[i];
376     }
377 
378     /**
379      * @param i from 0 to {@link #getRootBlockCount() - 1}
380      */
381     public long getRootBlockOffset(int i) {
382       return blockOffsets[i];
383     }
384 
385     /**
386      * @param i zero-based index of a root-level block
387      * @return the on-disk size of the root-level block for version 2, or the
388      *         uncompressed size for version 1
389      */
390     public int getRootBlockDataSize(int i) {
391       return blockDataSizes[i];
392     }
393 
394     /**
395      * @return the number of root-level blocks in this block index
396      */
397     public int getRootBlockCount() {
398       return rootCount;
399     }
400 
401     /**
402      * Finds the root-level index block containing the given key.
403      *
404      * @param key
405      *          Key to find
406      * @return Offset of block containing <code>key</code> (between 0 and the
407      *         number of blocks - 1) or -1 if this file does not contain the
408      *         request.
409      */
410     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
411       int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
412       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
413       // binarySearch's javadoc.
414 
415       if (pos >= 0) {
416         // This means this is an exact match with an element of blockKeys.
417         assert pos < blockKeys.length;
418         return pos;
419       }
420 
421       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
422       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
423       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
424       // key < blockKeys[0], meaning the file does not contain the given key.
425 
426       int i = -pos - 1;
427       assert 0 <= i && i <= blockKeys.length;
428       return i - 1;
429     }
430 
431     /**
432      * Finds the root-level index block containing the given key.
433      *
434      * @param key
435      *          Key to find
436      */
437     public int rootBlockContainingKey(final Cell key) {
438       int pos = Bytes.binarySearch(blockKeys, key, comparator);
439       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
440       // binarySearch's javadoc.
441 
442       if (pos >= 0) {
443         // This means this is an exact match with an element of blockKeys.
444         assert pos < blockKeys.length;
445         return pos;
446       }
447 
448       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
449       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
450       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
451       // key < blockKeys[0], meaning the file does not contain the given key.
452 
453       int i = -pos - 1;
454       assert 0 <= i && i <= blockKeys.length;
455       return i - 1;
456     }
457 
458     /**
459      * Adds a new entry in the root block index. Only used when reading.
460      *
461      * @param key Last key in the block
462      * @param offset file offset where the block is stored
463      * @param dataSize the uncompressed data size
464      */
465     private void add(final byte[] key, final long offset, final int dataSize) {
466       blockOffsets[rootCount] = offset;
467       blockKeys[rootCount] = key;
468       blockDataSizes[rootCount] = dataSize;
469       rootCount++;
470     }
471 
472     /**
473      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
474      * @param nonRootIndex
475      * @param i the ith position
476      * @return The indexed key at the ith position in the nonRootIndex.
477      */
478     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
479       int numEntries = nonRootIndex.getInt(0);
480       if (i < 0 || i >= numEntries) {
481         return null;
482       }
483 
484       // Entries start after the number of entries and the secondary index.
485       // The secondary index takes numEntries + 1 ints.
486       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
487       // Targetkey's offset relative to the end of secondary index
488       int targetKeyRelOffset = nonRootIndex.getInt(
489           Bytes.SIZEOF_INT * (i + 1));
490 
491       // The offset of the target key in the blockIndex buffer
492       int targetKeyOffset = entriesOffset     // Skip secondary index
493           + targetKeyRelOffset               // Skip all entries until mid
494           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
495 
496       // We subtract the two consecutive secondary index elements, which
497       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
498       // then need to subtract the overhead of offset and onDiskSize.
499       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
500         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
501 
502       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
503     }
504 
505     /**
506      * Performs a binary search over a non-root level index block. Utilizes the
507      * secondary index, which records the offsets of (offset, onDiskSize,
508      * firstKey) tuples of all entries.
509      *
510      * @param key
511      *          the key we are searching for offsets to individual entries in
512      *          the blockIndex buffer
513      * @param nonRootIndex
514      *          the non-root index block buffer, starting with the secondary
515      *          index. The position is ignored.
516      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
517      *         keys[i + 1], if keys is the array of all keys being searched, or
518      *         -1 otherwise
519      * @throws IOException
520      */
521     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
522         KVComparator comparator) {
523 
524       int numEntries = nonRootIndex.getInt(0);
525       int low = 0;
526       int high = numEntries - 1;
527       int mid = 0;
528 
529       // Entries start after the number of entries and the secondary index.
530       // The secondary index takes numEntries + 1 ints.
531       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
532 
533       // If we imagine that keys[-1] = -Infinity and
534       // keys[numEntries] = Infinity, then we are maintaining an invariant that
535       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
536       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
537       while (low <= high) {
538         mid = (low + high) >>> 1;
539 
540         // Midkey's offset relative to the end of secondary index
541         int midKeyRelOffset = nonRootIndex.getInt(
542             Bytes.SIZEOF_INT * (mid + 1));
543 
544         // The offset of the middle key in the blockIndex buffer
545         int midKeyOffset = entriesOffset       // Skip secondary index
546             + midKeyRelOffset                  // Skip all entries until mid
547             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
548 
549         // We subtract the two consecutive secondary index elements, which
550         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
551         // then need to subtract the overhead of offset and onDiskSize.
552         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
553             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
554 
555         // we have to compare in this order, because the comparator order
556         // has special logic when the 'left side' is a special key.
557         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
558         // done after HBASE-12224 & HBASE-12282
559         nonRootIndexKV.setKey(nonRootIndex.array(),
560             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
561         int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
562 
563         // key lives above the midpoint
564         if (cmp > 0)
565           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
566         // key lives below the midpoint
567         else if (cmp < 0)
568           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
569         else
570           return mid; // exact match
571       }
572 
573       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
574       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
575       // condition, low >= high + 1. Therefore, low = high + 1.
576 
577       if (low != high + 1) {
578         throw new IllegalStateException("Binary search broken: low=" + low
579             + " " + "instead of " + (high + 1));
580       }
581 
582       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
583       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
584       int i = low - 1;
585 
586       // Some extra validation on the result.
587       if (i < -1 || i >= numEntries) {
588         throw new IllegalStateException("Binary search broken: result is " +
589             i + " but expected to be between -1 and (numEntries - 1) = " +
590             (numEntries - 1));
591       }
592 
593       return i;
594     }
595 
596     /**
597      * Search for one key using the secondary index in a non-root block. In case
598      * of success, positions the provided buffer at the entry of interest, where
599      * the file offset and the on-disk-size can be read.
600      *
601      * @param nonRootBlock
602      *          a non-root block without header. Initial position does not
603      *          matter.
604      * @param key
605      *          the byte array containing the key
606      * @return the index position where the given key was found, otherwise
607      *         return -1 in the case the given key is before the first key.
608      *
609      */
610     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
611         KVComparator comparator) {
612       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
613 
614       if (entryIndex != -1) {
615         int numEntries = nonRootBlock.getInt(0);
616 
617         // The end of secondary index and the beginning of entries themselves.
618         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
619 
620         // The offset of the entry we are interested in relative to the end of
621         // the secondary index.
622         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
623 
624         nonRootBlock.position(entriesOffset + entryRelOffset);
625       }
626 
627       return entryIndex;
628     }
629 
630     /**
631      * Read in the root-level index from the given input stream. Must match
632      * what was written into the root level by
633      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
634      * offset that function returned.
635      *
636      * @param in the buffered input stream or wrapped byte input stream
637      * @param numEntries the number of root-level index entries
638      * @throws IOException
639      */
640     public void readRootIndex(DataInput in, final int numEntries)
641         throws IOException {
642       blockOffsets = new long[numEntries];
643       blockKeys = new byte[numEntries][];
644       blockDataSizes = new int[numEntries];
645 
646       // If index size is zero, no index was written.
647       if (numEntries > 0) {
648         for (int i = 0; i < numEntries; ++i) {
649           long offset = in.readLong();
650           int dataSize = in.readInt();
651           byte[] key = Bytes.readByteArray(in);
652           add(key, offset, dataSize);
653         }
654       }
655     }
656     
657     /**
658      * Read in the root-level index from the given input stream. Must match
659      * what was written into the root level by
660      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
661      * offset that function returned.
662      *
663      * @param blk the HFile block
664      * @param numEntries the number of root-level index entries
665      * @return the buffered input stream or wrapped byte input stream
666      * @throws IOException
667      */
668     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
669       DataInputStream in = blk.getByteStream();
670       readRootIndex(in, numEntries);
671       return in;
672     }
673 
674     /**
675      * Read the root-level metadata of a multi-level block index. Based on
676      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
677      * necessary to compute the mid-key in a multi-level index.
678      *
679      * @param blk the HFile block
680      * @param numEntries the number of root-level index entries
681      * @throws IOException
682      */
683     public void readMultiLevelIndexRoot(HFileBlock blk,
684         final int numEntries) throws IOException {
685       DataInputStream in = readRootIndex(blk, numEntries);
686       // after reading the root index the checksum bytes have to
687       // be subtracted to know if the mid key exists.
688       int checkSumBytes = blk.totalChecksumBytes();
689       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
690         // No mid-key metadata available.
691         return;
692       }
693       midLeafBlockOffset = in.readLong();
694       midLeafBlockOnDiskSize = in.readInt();
695       midKeyEntry = in.readInt();
696     }
697 
698     @Override
699     public String toString() {
700       StringBuilder sb = new StringBuilder();
701       sb.append("size=" + rootCount).append("\n");
702       for (int i = 0; i < rootCount; i++) {
703         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
704             .append("\n  offset=").append(blockOffsets[i])
705             .append(", dataSize=" + blockDataSizes[i]).append("\n");
706       }
707       return sb.toString();
708     }
709 
710     @Override
711     public long heapSize() {
712       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
713           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
714 
715       // Mid-key metadata.
716       heapSize += MID_KEY_METADATA_SIZE;
717 
718       // Calculating the size of blockKeys
719       if (blockKeys != null) {
720         // Adding array + references overhead
721         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
722             * ClassSize.REFERENCE);
723 
724         // Adding bytes
725         for (byte[] key : blockKeys) {
726           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
727         }
728       }
729 
730       if (blockOffsets != null) {
731         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
732             * Bytes.SIZEOF_LONG);
733       }
734 
735       if (blockDataSizes != null) {
736         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
737             * Bytes.SIZEOF_INT);
738       }
739 
740       return ClassSize.align(heapSize);
741     }
742 
743   }
744 
745   /**
746    * Writes the block index into the output stream. Generate the tree from
747    * bottom up. The leaf level is written to disk as a sequence of inline
748    * blocks, if it is larger than a certain number of bytes. If the leaf level
749    * is not large enough, we write all entries to the root level instead.
750    *
751    * After all leaf blocks have been written, we end up with an index
752    * referencing the resulting leaf index blocks. If that index is larger than
753    * the allowed root index size, the writer will break it up into
754    * reasonable-size intermediate-level index block chunks write those chunks
755    * out, and create another index referencing those chunks. This will be
756    * repeated until the remaining index is small enough to become the root
757    * index. However, in most practical cases we will only have leaf-level
758    * blocks and the root index, or just the root index.
759    */
760   public static class BlockIndexWriter implements InlineBlockWriter {
761     /**
762      * While the index is being written, this represents the current block
763      * index referencing all leaf blocks, with one exception. If the file is
764      * being closed and there are not enough blocks to complete even a single
765      * leaf block, no leaf blocks get written and this contains the entire
766      * block index. After all levels of the index were written by
767      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
768      * root-level index.
769      */
770     private BlockIndexChunk rootChunk = new BlockIndexChunk();
771 
772     /**
773      * Current leaf-level chunk. New entries referencing data blocks get added
774      * to this chunk until it grows large enough to be written to disk.
775      */
776     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
777 
778     /**
779      * The number of block index levels. This is one if there is only root
780      * level (even empty), two if there a leaf level and root level, and is
781      * higher if there are intermediate levels. This is only final after
782      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
783      * initial value accounts for the root level, and will be increased to two
784      * as soon as we find out there is a leaf-level in
785      * {@link #blockWritten(long, int, int)}.
786      */
787     private int numLevels = 1;
788 
789     private HFileBlock.Writer blockWriter;
790     private byte[] firstKey = null;
791 
792     /**
793      * The total number of leaf-level entries, i.e. entries referenced by
794      * leaf-level blocks. For the data block index this is equal to the number
795      * of data blocks.
796      */
797     private long totalNumEntries;
798 
799     /** Total compressed size of all index blocks. */
800     private long totalBlockOnDiskSize;
801 
802     /** Total uncompressed size of all index blocks. */
803     private long totalBlockUncompressedSize;
804 
805     /** The maximum size guideline of all multi-level index blocks. */
806     private int maxChunkSize;
807 
808     /** The maximum level of multi-level index blocks */
809     private int minIndexNumEntries;
810 
811     /** Whether we require this block index to always be single-level. */
812     private boolean singleLevelOnly;
813 
814     /** CacheConfig, or null if cache-on-write is disabled */
815     private CacheConfig cacheConf;
816 
817     /** Name to use for computing cache keys */
818     private String nameForCaching;
819 
820     /** Creates a single-level block index writer */
821     public BlockIndexWriter() {
822       this(null, null, null);
823       singleLevelOnly = true;
824     }
825 
826     /**
827      * Creates a multi-level block index writer.
828      *
829      * @param blockWriter the block writer to use to write index blocks
830      * @param cacheConf used to determine when and how a block should be cached-on-write.
831      */
832     public BlockIndexWriter(HFileBlock.Writer blockWriter,
833         CacheConfig cacheConf, String nameForCaching) {
834       if ((cacheConf == null) != (nameForCaching == null)) {
835         throw new IllegalArgumentException("Block cache and file name for " +
836             "caching must be both specified or both null");
837       }
838 
839       this.blockWriter = blockWriter;
840       this.cacheConf = cacheConf;
841       this.nameForCaching = nameForCaching;
842       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
843       this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
844     }
845 
846     public void setMaxChunkSize(int maxChunkSize) {
847       if (maxChunkSize <= 0) {
848         throw new IllegalArgumentException("Invalid maximum index block size");
849       }
850       this.maxChunkSize = maxChunkSize;
851     }
852 
853     public void setMinIndexNumEntries(int minIndexNumEntries) {
854       if (minIndexNumEntries <= 1) {
855         throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
856       }
857       this.minIndexNumEntries = minIndexNumEntries;
858     }
859 
860     /**
861      * Writes the root level and intermediate levels of the block index into
862      * the output stream, generating the tree from bottom up. Assumes that the
863      * leaf level has been inline-written to the disk if there is enough data
864      * for more than one leaf block. We iterate by breaking the current level
865      * of the block index, starting with the index of all leaf-level blocks,
866      * into chunks small enough to be written to disk, and generate its parent
867      * level, until we end up with a level small enough to become the root
868      * level.
869      *
870      * If the leaf level is not large enough, there is no inline block index
871      * anymore, so we only write that level of block index to disk as the root
872      * level.
873      *
874      * @param out FSDataOutputStream
875      * @return position at which we entered the root-level index.
876      * @throws IOException
877      */
878     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
879       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
880         throw new IOException("Trying to write a multi-level block index, " +
881             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
882             "last inline chunk.");
883       }
884 
885       // We need to get mid-key metadata before we create intermediate
886       // indexes and overwrite the root chunk.
887       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
888           : null;
889 
890       if (curInlineChunk != null) {
891         while (rootChunk.getRootSize() > maxChunkSize
892             // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
893             && rootChunk.getNumEntries() > minIndexNumEntries
894             // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
895             && numLevels < 16) {
896           rootChunk = writeIntermediateLevel(out, rootChunk);
897           numLevels += 1;
898         }
899       }
900 
901       // write the root level
902       long rootLevelIndexPos = out.getPos();
903 
904       {
905         DataOutput blockStream =
906             blockWriter.startWriting(BlockType.ROOT_INDEX);
907         rootChunk.writeRoot(blockStream);
908         if (midKeyMetadata != null)
909           blockStream.write(midKeyMetadata);
910         blockWriter.writeHeaderAndData(out);
911       }
912 
913       // Add root index block size
914       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
915       totalBlockUncompressedSize +=
916           blockWriter.getUncompressedSizeWithoutHeader();
917 
918       if (LOG.isTraceEnabled()) {
919         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
920           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
921           + " root-level entries, " + totalNumEntries + " total entries, "
922           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
923           " on-disk size, "
924           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
925           " total uncompressed size.");
926       }
927       return rootLevelIndexPos;
928     }
929 
930     /**
931      * Writes the block index data as a single level only. Does not do any
932      * block framing.
933      *
934      * @param out the buffered output stream to write the index to. Typically a
935      *          stream writing into an {@link HFile} block.
936      * @param description a short description of the index being written. Used
937      *          in a log message.
938      * @throws IOException
939      */
940     public void writeSingleLevelIndex(DataOutput out, String description)
941         throws IOException {
942       expectNumLevels(1);
943 
944       if (!singleLevelOnly)
945         throw new IOException("Single-level mode is turned off");
946 
947       if (rootChunk.getNumEntries() > 0)
948         throw new IOException("Root-level entries already added in " +
949             "single-level mode");
950 
951       rootChunk = curInlineChunk;
952       curInlineChunk = new BlockIndexChunk();
953 
954       if (LOG.isTraceEnabled()) {
955         LOG.trace("Wrote a single-level " + description + " index with "
956           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
957           + " bytes");
958       }
959       rootChunk.writeRoot(out);
960     }
961 
962     /**
963      * Split the current level of the block index into intermediate index
964      * blocks of permitted size and write those blocks to disk. Return the next
965      * level of the block index referencing those intermediate-level blocks.
966      *
967      * @param out
968      * @param currentLevel the current level of the block index, such as the a
969      *          chunk referencing all leaf-level index blocks
970      * @return the parent level block index, which becomes the root index after
971      *         a few (usually zero) iterations
972      * @throws IOException
973      */
974     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
975         BlockIndexChunk currentLevel) throws IOException {
976       // Entries referencing intermediate-level blocks we are about to create.
977       BlockIndexChunk parent = new BlockIndexChunk();
978 
979       // The current intermediate-level block index chunk.
980       BlockIndexChunk curChunk = new BlockIndexChunk();
981 
982       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
983         curChunk.add(currentLevel.getBlockKey(i),
984             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
985 
986         // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
987         // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
988         // first key is larger than maxChunkSize this will cause infinite recursion.
989         if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
990           writeIntermediateBlock(out, parent, curChunk);
991         }
992       }
993 
994       if (curChunk.getNumEntries() > 0) {
995         writeIntermediateBlock(out, parent, curChunk);
996       }
997 
998       return parent;
999     }
1000 
1001     private void writeIntermediateBlock(FSDataOutputStream out,
1002         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1003       long beginOffset = out.getPos();
1004       DataOutputStream dos = blockWriter.startWriting(
1005           BlockType.INTERMEDIATE_INDEX);
1006       curChunk.writeNonRoot(dos);
1007       byte[] curFirstKey = curChunk.getBlockKey(0);
1008       blockWriter.writeHeaderAndData(out);
1009 
1010       if (cacheConf != null) {
1011         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1012         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1013           beginOffset), blockForCaching);
1014       }
1015 
1016       // Add intermediate index block size
1017       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1018       totalBlockUncompressedSize +=
1019           blockWriter.getUncompressedSizeWithoutHeader();
1020 
1021       // OFFSET is the beginning offset the chunk of block index entries.
1022       // SIZE is the total byte size of the chunk of block index entries
1023       // + the secondary index size
1024       // FIRST_KEY is the first key in the chunk of block index
1025       // entries.
1026       parent.add(curFirstKey, beginOffset,
1027           blockWriter.getOnDiskSizeWithHeader());
1028 
1029       // clear current block index chunk
1030       curChunk.clear();
1031       curFirstKey = null;
1032     }
1033 
1034     /**
1035      * @return how many block index entries there are in the root level
1036      */
1037     public final int getNumRootEntries() {
1038       return rootChunk.getNumEntries();
1039     }
1040 
1041     /**
1042      * @return the number of levels in this block index.
1043      */
1044     public int getNumLevels() {
1045       return numLevels;
1046     }
1047 
1048     private void expectNumLevels(int expectedNumLevels) {
1049       if (numLevels != expectedNumLevels) {
1050         throw new IllegalStateException("Number of block index levels is "
1051             + numLevels + "but is expected to be " + expectedNumLevels);
1052       }
1053     }
1054 
1055     /**
1056      * Whether there is an inline block ready to be written. In general, we
1057      * write an leaf-level index block as an inline block as soon as its size
1058      * as serialized in the non-root format reaches a certain threshold.
1059      */
1060     @Override
1061     public boolean shouldWriteBlock(boolean closing) {
1062       if (singleLevelOnly) {
1063         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1064       }
1065 
1066       if (curInlineChunk == null) {
1067         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1068             "called with closing=true and then called again?");
1069       }
1070 
1071       if (curInlineChunk.getNumEntries() == 0) {
1072         return false;
1073       }
1074 
1075       // We do have some entries in the current inline chunk.
1076       if (closing) {
1077         if (rootChunk.getNumEntries() == 0) {
1078           // We did not add any leaf-level blocks yet. Instead of creating a
1079           // leaf level with one block, move these entries to the root level.
1080 
1081           expectNumLevels(1);
1082           rootChunk = curInlineChunk;
1083           curInlineChunk = null;  // Disallow adding any more index entries.
1084           return false;
1085         }
1086 
1087         return true;
1088       } else {
1089         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1090       }
1091     }
1092 
1093     /**
1094      * Write out the current inline index block. Inline blocks are non-root
1095      * blocks, so the non-root index format is used.
1096      *
1097      * @param out
1098      */
1099     @Override
1100     public void writeInlineBlock(DataOutput out) throws IOException {
1101       if (singleLevelOnly)
1102         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1103 
1104       // Write the inline block index to the output stream in the non-root
1105       // index block format.
1106       curInlineChunk.writeNonRoot(out);
1107 
1108       // Save the first key of the inline block so that we can add it to the
1109       // parent-level index.
1110       firstKey = curInlineChunk.getBlockKey(0);
1111 
1112       // Start a new inline index block
1113       curInlineChunk.clear();
1114     }
1115 
1116     /**
1117      * Called after an inline block has been written so that we can add an
1118      * entry referring to that block to the parent-level index.
1119      */
1120     @Override
1121     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1122       // Add leaf index block size
1123       totalBlockOnDiskSize += onDiskSize;
1124       totalBlockUncompressedSize += uncompressedSize;
1125 
1126       if (singleLevelOnly)
1127         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1128 
1129       if (firstKey == null) {
1130         throw new IllegalStateException("Trying to add second-level index " +
1131             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1132             "but the first key was not set in writeInlineBlock");
1133       }
1134 
1135       if (rootChunk.getNumEntries() == 0) {
1136         // We are writing the first leaf block, so increase index level.
1137         expectNumLevels(1);
1138         numLevels = 2;
1139       }
1140 
1141       // Add another entry to the second-level index. Include the number of
1142       // entries in all previous leaf-level chunks for mid-key calculation.
1143       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1144       firstKey = null;
1145     }
1146 
1147     @Override
1148     public BlockType getInlineBlockType() {
1149       return BlockType.LEAF_INDEX;
1150     }
1151 
1152     /**
1153      * Add one index entry to the current leaf-level block. When the leaf-level
1154      * block gets large enough, it will be flushed to disk as an inline block.
1155      *
1156      * @param firstKey the first key of the data block
1157      * @param blockOffset the offset of the data block
1158      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1159      *          format version 2), or the uncompressed size of the data block (
1160      *          {@link HFile} format version 1).
1161      */
1162     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1163       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1164       ++totalNumEntries;
1165     }
1166 
1167     /**
1168      * @throws IOException if we happened to write a multi-level index.
1169      */
1170     public void ensureSingleLevel() throws IOException {
1171       if (numLevels > 1) {
1172         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1173             rootChunk.getNumEntries() + " root-level entries, but " +
1174             "this is expected to be a single-level block index.");
1175       }
1176     }
1177 
1178     /**
1179      * @return true if we are using cache-on-write. This is configured by the
1180      *         caller of the constructor by either passing a valid block cache
1181      *         or null.
1182      */
1183     @Override
1184     public boolean getCacheOnWrite() {
1185       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1186     }
1187 
1188     /**
1189      * The total uncompressed size of the root index block, intermediate-level
1190      * index blocks, and leaf-level index blocks.
1191      *
1192      * @return the total uncompressed size of all index blocks
1193      */
1194     public long getTotalUncompressedSize() {
1195       return totalBlockUncompressedSize;
1196     }
1197 
1198   }
1199 
1200   /**
1201    * A single chunk of the block index in the process of writing. The data in
1202    * this chunk can become a leaf-level, intermediate-level, or root index
1203    * block.
1204    */
1205   static class BlockIndexChunk {
1206 
1207     /** First keys of the key range corresponding to each index entry. */
1208     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1209 
1210     /** Block offset in backing stream. */
1211     private final List<Long> blockOffsets = new ArrayList<Long>();
1212 
1213     /** On-disk data sizes of lower-level data or index blocks. */
1214     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1215 
1216     /**
1217      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1218      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1219      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1220      */
1221     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1222 
1223     /**
1224      * The offset of the next entry to be added, relative to the end of the
1225      * "secondary index" in the "non-root" format representation of this index
1226      * chunk. This is the next value to be added to the secondary index.
1227      */
1228     private int curTotalNonRootEntrySize = 0;
1229 
1230     /**
1231      * The accumulated size of this chunk if stored in the root index format.
1232      */
1233     private int curTotalRootSize = 0;
1234 
1235     /**
1236      * The "secondary index" used for binary search over variable-length
1237      * records in a "non-root" format block. These offsets are relative to the
1238      * end of this secondary index.
1239      */
1240     private final List<Integer> secondaryIndexOffsetMarks =
1241         new ArrayList<Integer>();
1242 
1243     /**
1244      * Adds a new entry to this block index chunk.
1245      *
1246      * @param firstKey the first key in the block pointed to by this entry
1247      * @param blockOffset the offset of the next-level block pointed to by this
1248      *          entry
1249      * @param onDiskDataSize the on-disk data of the block pointed to by this
1250      *          entry, including header size
1251      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1252      *          construction, this specifies the current total number of
1253      *          sub-entries in all leaf-level chunks, including the one
1254      *          corresponding to the second-level entry being added.
1255      */
1256     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1257         long curTotalNumSubEntries) {
1258       // Record the offset for the secondary index
1259       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1260       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1261           + firstKey.length;
1262 
1263       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1264           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1265 
1266       blockKeys.add(firstKey);
1267       blockOffsets.add(blockOffset);
1268       onDiskDataSizes.add(onDiskDataSize);
1269 
1270       if (curTotalNumSubEntries != -1) {
1271         numSubEntriesAt.add(curTotalNumSubEntries);
1272 
1273         // Make sure the parallel arrays are in sync.
1274         if (numSubEntriesAt.size() != blockKeys.size()) {
1275           throw new IllegalStateException("Only have key/value count " +
1276               "stats for " + numSubEntriesAt.size() + " block index " +
1277               "entries out of " + blockKeys.size());
1278         }
1279       }
1280     }
1281 
1282     /**
1283      * The same as {@link #add(byte[], long, int, long)} but does not take the
1284      * key/value into account. Used for single-level indexes.
1285      *
1286      * @see {@link #add(byte[], long, int, long)}
1287      */
1288     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1289       add(firstKey, blockOffset, onDiskDataSize, -1);
1290     }
1291 
1292     public void clear() {
1293       blockKeys.clear();
1294       blockOffsets.clear();
1295       onDiskDataSizes.clear();
1296       secondaryIndexOffsetMarks.clear();
1297       numSubEntriesAt.clear();
1298       curTotalNonRootEntrySize = 0;
1299       curTotalRootSize = 0;
1300     }
1301 
1302     /**
1303      * Finds the entry corresponding to the deeper-level index block containing
1304      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1305      * ordering of sub-entries.
1306      *
1307      * <p>
1308      * <i> Implementation note. </i> We are looking for i such that
1309      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1310      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1311      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1312      * sub-entries. i is by definition the insertion point of k in
1313      * numSubEntriesAt.
1314      *
1315      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1316      * @return the 0-based index of the entry corresponding to the given
1317      *         sub-entry
1318      */
1319     public int getEntryBySubEntry(long k) {
1320       // We define mid-key as the key corresponding to k'th sub-entry
1321       // (0-based).
1322 
1323       int i = Collections.binarySearch(numSubEntriesAt, k);
1324 
1325       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1326       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1327       // is in the (i + 1)'th chunk.
1328       if (i >= 0)
1329         return i + 1;
1330 
1331       // Inexact match. Return the insertion point.
1332       return -i - 1;
1333     }
1334 
1335     /**
1336      * Used when writing the root block index of a multi-level block index.
1337      * Serializes additional information allowing to efficiently identify the
1338      * mid-key.
1339      *
1340      * @return a few serialized fields for finding the mid-key
1341      * @throws IOException if could not create metadata for computing mid-key
1342      */
1343     public byte[] getMidKeyMetadata() throws IOException {
1344       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1345           MID_KEY_METADATA_SIZE);
1346       DataOutputStream baosDos = new DataOutputStream(baos);
1347       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1348       if (totalNumSubEntries == 0) {
1349         throw new IOException("No leaf-level entries, mid-key unavailable");
1350       }
1351       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1352       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1353 
1354       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1355       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1356 
1357       long numSubEntriesBefore = midKeyEntry > 0
1358           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1359       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1360       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1361       {
1362         throw new IOException("Could not identify mid-key index within the "
1363             + "leaf-level block containing mid-key: out of range ("
1364             + subEntryWithinEntry + ", numSubEntriesBefore="
1365             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1366             + ")");
1367       }
1368 
1369       baosDos.writeInt((int) subEntryWithinEntry);
1370 
1371       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1372         throw new IOException("Could not write mid-key metadata: size=" +
1373             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1374       }
1375 
1376       // Close just to be good citizens, although this has no effect.
1377       baos.close();
1378 
1379       return baos.toByteArray();
1380     }
1381 
1382     /**
1383      * Writes the block index chunk in the non-root index block format. This
1384      * format contains the number of entries, an index of integer offsets
1385      * for quick binary search on variable-length records, and tuples of
1386      * block offset, on-disk block size, and the first key for each entry.
1387      *
1388      * @param out
1389      * @throws IOException
1390      */
1391     void writeNonRoot(DataOutput out) throws IOException {
1392       // The number of entries in the block.
1393       out.writeInt(blockKeys.size());
1394 
1395       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1396         throw new IOException("Corrupted block index chunk writer: " +
1397             blockKeys.size() + " entries but " +
1398             secondaryIndexOffsetMarks.size() + " secondary index items");
1399       }
1400 
1401       // For each entry, write a "secondary index" of relative offsets to the
1402       // entries from the end of the secondary index. This works, because at
1403       // read time we read the number of entries and know where the secondary
1404       // index ends.
1405       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1406         out.writeInt(currentSecondaryIndex);
1407 
1408       // We include one other element in the secondary index to calculate the
1409       // size of each entry more easily by subtracting secondary index elements.
1410       out.writeInt(curTotalNonRootEntrySize);
1411 
1412       for (int i = 0; i < blockKeys.size(); ++i) {
1413         out.writeLong(blockOffsets.get(i));
1414         out.writeInt(onDiskDataSizes.get(i));
1415         out.write(blockKeys.get(i));
1416       }
1417     }
1418 
1419     /**
1420      * @return the size of this chunk if stored in the non-root index block
1421      *         format
1422      */
1423     int getNonRootSize() {
1424       return Bytes.SIZEOF_INT                          // Number of entries
1425           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1426           + curTotalNonRootEntrySize;                  // All entries
1427     }
1428 
1429     /**
1430      * Writes this chunk into the given output stream in the root block index
1431      * format. This format is similar to the {@link HFile} version 1 block
1432      * index format, except that we store on-disk size of the block instead of
1433      * its uncompressed size.
1434      *
1435      * @param out the data output stream to write the block index to. Typically
1436      *          a stream writing into an {@link HFile} block.
1437      * @throws IOException
1438      */
1439     void writeRoot(DataOutput out) throws IOException {
1440       for (int i = 0; i < blockKeys.size(); ++i) {
1441         out.writeLong(blockOffsets.get(i));
1442         out.writeInt(onDiskDataSizes.get(i));
1443         Bytes.writeByteArray(out, blockKeys.get(i));
1444       }
1445     }
1446 
1447     /**
1448      * @return the size of this chunk if stored in the root index block format
1449      */
1450     int getRootSize() {
1451       return curTotalRootSize;
1452     }
1453 
1454     /**
1455      * @return the number of entries in this block index chunk
1456      */
1457     public int getNumEntries() {
1458       return blockKeys.size();
1459     }
1460 
1461     public byte[] getBlockKey(int i) {
1462       return blockKeys.get(i);
1463     }
1464 
1465     public long getBlockOffset(int i) {
1466       return blockOffsets.get(i);
1467     }
1468 
1469     public int getOnDiskDataSize(int i) {
1470       return onDiskDataSizes.get(i);
1471     }
1472 
1473     public long getCumulativeNumKV(int i) {
1474       if (i < 0)
1475         return 0;
1476       return numSubEntriesAt.get(i);
1477     }
1478 
1479   }
1480 
1481   public static int getMaxChunkSize(Configuration conf) {
1482     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1483   }
1484 
1485   public static int getMinIndexNumEntries(Configuration conf) {
1486     return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1487   }
1488 }