View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValue.KVComparator;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.io.HeapSize;
44  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
46  import org.apache.hadoop.hbase.util.ByteBufferUtils;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.io.WritableUtils;
50  import org.apache.hadoop.util.StringUtils;
51  
52  /**
53   * Provides functionality to write ({@link BlockIndexWriter}) and read
54   * ({@link BlockIndexReader}) single-level and multi-level block indexes.
55   *
56   * Examples of how to use the block index writer can be found in
57   * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} 
58   * and {@link HFileWriterV2}. Examples of how to use the reader can be 
59   * found in {@link HFileReaderV2} and TestHFileBlockIndex.
60   */
61  @InterfaceAudience.Private
62  public class HFileBlockIndex {
63  
64    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
65  
66    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
67  
68    /**
69     * The maximum size guideline for index blocks (both leaf, intermediate, and
70     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
71     */
72    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
73  
74    /**
75     * Minimum number of entries in a single index block. Even if we are above the
76     * hfile.index.block.max.size we will keep writing to the same block unless we have that many
77     * entries. We should have at least a few entries so that we don't have too many levels in the
78     * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
79     */
80    public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
81  
82    static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
83  
84    /**
85     * The number of bytes stored in each "secondary index" entry in addition to
86     * key bytes in the non-root index block format. The first long is the file
87     * offset of the deeper-level block the entry points to, and the int that
88     * follows is that block's on-disk size without including header.
89     */
90    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
91        + Bytes.SIZEOF_LONG;
92  
93    /**
94     * Error message when trying to use inline block API in single-level mode.
95     */
96    private static final String INLINE_BLOCKS_NOT_ALLOWED =
97        "Inline blocks are not allowed in the single-level-only mode";
98  
99    /**
100    * The size of a meta-data record used for finding the mid-key in a
101    * multi-level index. Consists of the middle leaf-level index block offset
102    * (long), its on-disk size without header included (int), and the mid-key
103    * entry's zero-based index in that leaf index block.
104    */
105   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
106       2 * Bytes.SIZEOF_INT;
107 
108   /**
109    * The reader will always hold the root level index in the memory. Index
110    * blocks at all other levels will be cached in the LRU cache in practice,
111    * although this API does not enforce that.
112    *
113    * All non-root (leaf and intermediate) index blocks contain what we call a
114    * "secondary index": an array of offsets to the entries within the block.
115    * This allows us to do binary search for the entry corresponding to the
116    * given key without having to deserialize the block.
117    */
118   public static class BlockIndexReader implements HeapSize {
119     /** Needed doing lookup on blocks. */
120     private final KVComparator comparator;
121 
122     // Root-level data.
123     private byte[][] blockKeys;
124     private long[] blockOffsets;
125     private int[] blockDataSizes;
126     private int rootCount = 0;
127 
128     // Mid-key metadata.
129     private long midLeafBlockOffset = -1;
130     private int midLeafBlockOnDiskSize = -1;
131     private int midKeyEntry = -1;
132 
133     /** Pre-computed mid-key */
134     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
135 
136     /**
137      * The number of levels in the block index tree. One if there is only root
138      * level, two for root and leaf levels, etc.
139      */
140     private int searchTreeLevel;
141 
142     /** A way to read {@link HFile} blocks at a given offset */
143     private CachingBlockReader cachingBlockReader;
144 
145     public BlockIndexReader(final KVComparator c, final int treeLevel,
146         final CachingBlockReader cachingBlockReader) {
147       this(c, treeLevel);
148       this.cachingBlockReader = cachingBlockReader;
149     }
150 
151     public BlockIndexReader(final KVComparator c, final int treeLevel)
152     {
153       comparator = c;
154       searchTreeLevel = treeLevel;
155     }
156 
157     /**
158      * @return true if the block index is empty.
159      */
160     public boolean isEmpty() {
161       return blockKeys.length == 0;
162     }
163 
164     /**
165      * Verifies that the block index is non-empty and throws an
166      * {@link IllegalStateException} otherwise.
167      */
168     public void ensureNonEmpty() {
169       if (blockKeys.length == 0) {
170         throw new IllegalStateException("Block index is empty or not loaded");
171       }
172     }
173 
174     /**
175      * Return the data block which contains this key. This function will only
176      * be called when the HFile version is larger than 1.
177      *
178      * @param key the key we are looking for
179      * @param currentBlock the current block, to avoid re-reading the same block
180      * @param cacheBlocks
181      * @param pread
182      * @param isCompaction
183      * @param expectedDataBlockEncoding the data block encoding the caller is
184      *          expecting the data block to be in, or null to not perform this
185      *          check and return the block irrespective of the encoding
186      * @return reader a basic way to load blocks
187      * @throws IOException
188      */
189     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
190         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
191         throws IOException {
192       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
193           cacheBlocks,
194           pread, isCompaction, expectedDataBlockEncoding);
195       if (blockWithScanInfo == null) {
196         return null;
197       } else {
198         return blockWithScanInfo.getHFileBlock();
199       }
200     }
201 
202     /**
203      * Return the BlockWithScanInfo which contains the DataBlock with other scan
204      * info such as nextIndexedKey. This function will only be called when the
205      * HFile version is larger than 1.
206      *
207      * @param key
208      *          the key we are looking for
209      * @param currentBlock
210      *          the current block, to avoid re-reading the same block
211      * @param cacheBlocks
212      * @param pread
213      * @param isCompaction
214      * @param expectedDataBlockEncoding the data block encoding the caller is
215      *          expecting the data block to be in, or null to not perform this
216      *          check and return the block irrespective of the encoding.
217      * @return the BlockWithScanInfo which contains the DataBlock with other
218      *         scan info such as nextIndexedKey.
219      * @throws IOException
220      */
221     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
222         boolean cacheBlocks,
223         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
224         throws IOException {
225       int rootLevelIndex = rootBlockContainingKey(key);
226       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
227         return null;
228       }
229 
230       // the next indexed key
231       Cell nextIndexedKey = null;
232 
233       // Read the next-level (intermediate or leaf) index block.
234       long currentOffset = blockOffsets[rootLevelIndex];
235       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
236 
237       if (rootLevelIndex < blockKeys.length - 1) {
238         nextIndexedKey = new KeyValue.KeyOnlyKeyValue(blockKeys[rootLevelIndex + 1]);
239       } else {
240         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
241       }
242 
243       int lookupLevel = 1; // How many levels deep we are in our lookup.
244       int index = -1;
245 
246       HFileBlock block;
247       while (true) {
248 
249         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
250         {
251           // Avoid reading the same block again, even with caching turned off.
252           // This is crucial for compaction-type workload which might have
253           // caching turned off. This is like a one-block cache inside the
254           // scanner.
255           block = currentBlock;
256         } else {
257           // Call HFile's caching block reader API. We always cache index
258           // blocks, otherwise we might get terrible performance.
259           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
260           BlockType expectedBlockType;
261           if (lookupLevel < searchTreeLevel - 1) {
262             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
263           } else if (lookupLevel == searchTreeLevel - 1) {
264             expectedBlockType = BlockType.LEAF_INDEX;
265           } else {
266             // this also accounts for ENCODED_DATA
267             expectedBlockType = BlockType.DATA;
268           }
269           block = cachingBlockReader.readBlock(currentOffset,
270           currentOnDiskSize, shouldCache, pread, isCompaction, true,
271               expectedBlockType, expectedDataBlockEncoding);
272         }
273 
274         if (block == null) {
275           throw new IOException("Failed to read block at offset " +
276               currentOffset + ", onDiskSize=" + currentOnDiskSize);
277         }
278 
279         // Found a data block, break the loop and check our level in the tree.
280         if (block.getBlockType().isData()) {
281           break;
282         }
283 
284         // Not a data block. This must be a leaf-level or intermediate-level
285         // index block. We don't allow going deeper than searchTreeLevel.
286         if (++lookupLevel > searchTreeLevel) {
287           throw new IOException("Search Tree Level overflow: lookupLevel="+
288               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
289         }
290 
291         // Locate the entry corresponding to the given key in the non-root
292         // (leaf or intermediate-level) index block.
293         ByteBuffer buffer = block.getBufferWithoutHeader();
294         index = locateNonRootIndexEntry(buffer, key, comparator);
295         if (index == -1) {
296           // This has to be changed
297           // For now change this to key value
298           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
299           throw new IOException("The key "
300               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
301               + " is before the" + " first key of the non-root index block "
302               + block);
303         }
304 
305         currentOffset = buffer.getLong();
306         currentOnDiskSize = buffer.getInt();
307 
308         // Only update next indexed key if there is a next indexed key in the current level
309         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
310         if (tmpNextIndexedKey != null) {
311           nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey);
312         }
313       }
314 
315       if (lookupLevel != searchTreeLevel) {
316         throw new IOException("Reached a data block at level " + lookupLevel +
317             " but the number of levels is " + searchTreeLevel);
318       }
319 
320       // set the next indexed key for the current block.
321       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
322       return blockWithScanInfo;
323     }
324 
325     /**
326      * An approximation to the {@link HFile}'s mid-key. Operates on block
327      * boundaries, and does not go inside blocks. In other words, returns the
328      * first key of the middle block of the file.
329      *
330      * @return the first key of the middle block
331      */
332     public byte[] midkey() throws IOException {
333       if (rootCount == 0)
334         throw new IOException("HFile empty");
335 
336       byte[] targetMidKey = this.midKey.get();
337       if (targetMidKey != null) {
338         return targetMidKey;
339       }
340 
341       if (midLeafBlockOffset >= 0) {
342         if (cachingBlockReader == null) {
343           throw new IOException("Have to read the middle leaf block but " +
344               "no block reader available");
345         }
346 
347         // Caching, using pread, assuming this is not a compaction.
348         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
349             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
350             BlockType.LEAF_INDEX, null);
351 
352         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
353         int numDataBlocks = b.getInt();
354         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
355         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
356             keyRelOffset;
357         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
358             + SECONDARY_INDEX_ENTRY_OVERHEAD;
359         targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
360       } else {
361         // The middle of the root-level index.
362         targetMidKey = blockKeys[rootCount / 2];
363       }
364 
365       this.midKey.set(targetMidKey);
366       return targetMidKey;
367     }
368 
369     /**
370      * @param i from 0 to {@link #getRootBlockCount() - 1}
371      */
372     public byte[] getRootBlockKey(int i) {
373       return blockKeys[i];
374     }
375 
376     /**
377      * @param i from 0 to {@link #getRootBlockCount() - 1}
378      */
379     public long getRootBlockOffset(int i) {
380       return blockOffsets[i];
381     }
382 
383     /**
384      * @param i zero-based index of a root-level block
385      * @return the on-disk size of the root-level block for version 2, or the
386      *         uncompressed size for version 1
387      */
388     public int getRootBlockDataSize(int i) {
389       return blockDataSizes[i];
390     }
391 
392     /**
393      * @return the number of root-level blocks in this block index
394      */
395     public int getRootBlockCount() {
396       return rootCount;
397     }
398 
399     /**
400      * Finds the root-level index block containing the given key.
401      *
402      * @param key
403      *          Key to find
404      * @return Offset of block containing <code>key</code> (between 0 and the
405      *         number of blocks - 1) or -1 if this file does not contain the
406      *         request.
407      */
408     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
409       int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
410       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
411       // binarySearch's javadoc.
412 
413       if (pos >= 0) {
414         // This means this is an exact match with an element of blockKeys.
415         assert pos < blockKeys.length;
416         return pos;
417       }
418 
419       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
420       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
421       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
422       // key < blockKeys[0], meaning the file does not contain the given key.
423 
424       int i = -pos - 1;
425       assert 0 <= i && i <= blockKeys.length;
426       return i - 1;
427     }
428 
429     /**
430      * Finds the root-level index block containing the given key.
431      *
432      * @param key
433      *          Key to find
434      */
435     public int rootBlockContainingKey(final Cell key) {
436       int pos = Bytes.binarySearch(blockKeys, key, comparator);
437       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
438       // binarySearch's javadoc.
439 
440       if (pos >= 0) {
441         // This means this is an exact match with an element of blockKeys.
442         assert pos < blockKeys.length;
443         return pos;
444       }
445 
446       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
447       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
448       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
449       // key < blockKeys[0], meaning the file does not contain the given key.
450 
451       int i = -pos - 1;
452       assert 0 <= i && i <= blockKeys.length;
453       return i - 1;
454     }
455 
456     /**
457      * Adds a new entry in the root block index. Only used when reading.
458      *
459      * @param key Last key in the block
460      * @param offset file offset where the block is stored
461      * @param dataSize the uncompressed data size
462      */
463     private void add(final byte[] key, final long offset, final int dataSize) {
464       blockOffsets[rootCount] = offset;
465       blockKeys[rootCount] = key;
466       blockDataSizes[rootCount] = dataSize;
467       rootCount++;
468     }
469 
470     /**
471      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
472      * @param nonRootIndex
473      * @param i the ith position
474      * @return The indexed key at the ith position in the nonRootIndex.
475      */
476     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
477       int numEntries = nonRootIndex.getInt(0);
478       if (i < 0 || i >= numEntries) {
479         return null;
480       }
481 
482       // Entries start after the number of entries and the secondary index.
483       // The secondary index takes numEntries + 1 ints.
484       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
485       // Targetkey's offset relative to the end of secondary index
486       int targetKeyRelOffset = nonRootIndex.getInt(
487           Bytes.SIZEOF_INT * (i + 1));
488 
489       // The offset of the target key in the blockIndex buffer
490       int targetKeyOffset = entriesOffset     // Skip secondary index
491           + targetKeyRelOffset               // Skip all entries until mid
492           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
493 
494       // We subtract the two consecutive secondary index elements, which
495       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
496       // then need to subtract the overhead of offset and onDiskSize.
497       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
498         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
499 
500       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
501     }
502 
503     /**
504      * Performs a binary search over a non-root level index block. Utilizes the
505      * secondary index, which records the offsets of (offset, onDiskSize,
506      * firstKey) tuples of all entries.
507      *
508      * @param key
509      *          the key we are searching for offsets to individual entries in
510      *          the blockIndex buffer
511      * @param nonRootIndex
512      *          the non-root index block buffer, starting with the secondary
513      *          index. The position is ignored.
514      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
515      *         keys[i + 1], if keys is the array of all keys being searched, or
516      *         -1 otherwise
517      * @throws IOException
518      */
519     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
520         KVComparator comparator) {
521 
522       int numEntries = nonRootIndex.getInt(0);
523       int low = 0;
524       int high = numEntries - 1;
525       int mid = 0;
526 
527       // Entries start after the number of entries and the secondary index.
528       // The secondary index takes numEntries + 1 ints.
529       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
530 
531       // If we imagine that keys[-1] = -Infinity and
532       // keys[numEntries] = Infinity, then we are maintaining an invariant that
533       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
534       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
535       while (low <= high) {
536         mid = (low + high) >>> 1;
537 
538         // Midkey's offset relative to the end of secondary index
539         int midKeyRelOffset = nonRootIndex.getInt(
540             Bytes.SIZEOF_INT * (mid + 1));
541 
542         // The offset of the middle key in the blockIndex buffer
543         int midKeyOffset = entriesOffset       // Skip secondary index
544             + midKeyRelOffset                  // Skip all entries until mid
545             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
546 
547         // We subtract the two consecutive secondary index elements, which
548         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
549         // then need to subtract the overhead of offset and onDiskSize.
550         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
551             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
552 
553         // we have to compare in this order, because the comparator order
554         // has special logic when the 'left side' is a special key.
555         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
556         // done after HBASE-12224 & HBASE-12282
557         nonRootIndexKV.setKey(nonRootIndex.array(),
558             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
559         int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
560 
561         // key lives above the midpoint
562         if (cmp > 0)
563           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
564         // key lives below the midpoint
565         else if (cmp < 0)
566           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
567         else
568           return mid; // exact match
569       }
570 
571       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
572       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
573       // condition, low >= high + 1. Therefore, low = high + 1.
574 
575       if (low != high + 1) {
576         throw new IllegalStateException("Binary search broken: low=" + low
577             + " " + "instead of " + (high + 1));
578       }
579 
580       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
581       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
582       int i = low - 1;
583 
584       // Some extra validation on the result.
585       if (i < -1 || i >= numEntries) {
586         throw new IllegalStateException("Binary search broken: result is " +
587             i + " but expected to be between -1 and (numEntries - 1) = " +
588             (numEntries - 1));
589       }
590 
591       return i;
592     }
593 
594     /**
595      * Search for one key using the secondary index in a non-root block. In case
596      * of success, positions the provided buffer at the entry of interest, where
597      * the file offset and the on-disk-size can be read.
598      *
599      * @param nonRootBlock
600      *          a non-root block without header. Initial position does not
601      *          matter.
602      * @param key
603      *          the byte array containing the key
604      * @return the index position where the given key was found, otherwise
605      *         return -1 in the case the given key is before the first key.
606      *
607      */
608     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
609         KVComparator comparator) {
610       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
611 
612       if (entryIndex != -1) {
613         int numEntries = nonRootBlock.getInt(0);
614 
615         // The end of secondary index and the beginning of entries themselves.
616         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
617 
618         // The offset of the entry we are interested in relative to the end of
619         // the secondary index.
620         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
621 
622         nonRootBlock.position(entriesOffset + entryRelOffset);
623       }
624 
625       return entryIndex;
626     }
627 
628     /**
629      * Read in the root-level index from the given input stream. Must match
630      * what was written into the root level by
631      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
632      * offset that function returned.
633      *
634      * @param in the buffered input stream or wrapped byte input stream
635      * @param numEntries the number of root-level index entries
636      * @throws IOException
637      */
638     public void readRootIndex(DataInput in, final int numEntries)
639         throws IOException {
640       blockOffsets = new long[numEntries];
641       blockKeys = new byte[numEntries][];
642       blockDataSizes = new int[numEntries];
643 
644       // If index size is zero, no index was written.
645       if (numEntries > 0) {
646         for (int i = 0; i < numEntries; ++i) {
647           long offset = in.readLong();
648           int dataSize = in.readInt();
649           byte[] key = Bytes.readByteArray(in);
650           add(key, offset, dataSize);
651         }
652       }
653     }
654     
655     /**
656      * Read in the root-level index from the given input stream. Must match
657      * what was written into the root level by
658      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
659      * offset that function returned.
660      *
661      * @param blk the HFile block
662      * @param numEntries the number of root-level index entries
663      * @return the buffered input stream or wrapped byte input stream
664      * @throws IOException
665      */
666     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
667       DataInputStream in = blk.getByteStream();
668       readRootIndex(in, numEntries);
669       return in;
670     }
671 
672     /**
673      * Read the root-level metadata of a multi-level block index. Based on
674      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
675      * necessary to compute the mid-key in a multi-level index.
676      *
677      * @param blk the HFile block
678      * @param numEntries the number of root-level index entries
679      * @throws IOException
680      */
681     public void readMultiLevelIndexRoot(HFileBlock blk,
682         final int numEntries) throws IOException {
683       DataInputStream in = readRootIndex(blk, numEntries);
684       // after reading the root index the checksum bytes have to
685       // be subtracted to know if the mid key exists.
686       int checkSumBytes = blk.totalChecksumBytes();
687       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
688         // No mid-key metadata available.
689         return;
690       }
691       midLeafBlockOffset = in.readLong();
692       midLeafBlockOnDiskSize = in.readInt();
693       midKeyEntry = in.readInt();
694     }
695 
696     @Override
697     public String toString() {
698       StringBuilder sb = new StringBuilder();
699       sb.append("size=" + rootCount).append("\n");
700       for (int i = 0; i < rootCount; i++) {
701         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
702             .append("\n  offset=").append(blockOffsets[i])
703             .append(", dataSize=" + blockDataSizes[i]).append("\n");
704       }
705       return sb.toString();
706     }
707 
708     @Override
709     public long heapSize() {
710       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
711           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
712 
713       // Mid-key metadata.
714       heapSize += MID_KEY_METADATA_SIZE;
715 
716       // Calculating the size of blockKeys
717       if (blockKeys != null) {
718         // Adding array + references overhead
719         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
720             * ClassSize.REFERENCE);
721 
722         // Adding bytes
723         for (byte[] key : blockKeys) {
724           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
725         }
726       }
727 
728       if (blockOffsets != null) {
729         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
730             * Bytes.SIZEOF_LONG);
731       }
732 
733       if (blockDataSizes != null) {
734         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
735             * Bytes.SIZEOF_INT);
736       }
737 
738       return ClassSize.align(heapSize);
739     }
740 
741   }
742 
743   /**
744    * Writes the block index into the output stream. Generate the tree from
745    * bottom up. The leaf level is written to disk as a sequence of inline
746    * blocks, if it is larger than a certain number of bytes. If the leaf level
747    * is not large enough, we write all entries to the root level instead.
748    *
749    * After all leaf blocks have been written, we end up with an index
750    * referencing the resulting leaf index blocks. If that index is larger than
751    * the allowed root index size, the writer will break it up into
752    * reasonable-size intermediate-level index block chunks write those chunks
753    * out, and create another index referencing those chunks. This will be
754    * repeated until the remaining index is small enough to become the root
755    * index. However, in most practical cases we will only have leaf-level
756    * blocks and the root index, or just the root index.
757    */
758   public static class BlockIndexWriter implements InlineBlockWriter {
759     /**
760      * While the index is being written, this represents the current block
761      * index referencing all leaf blocks, with one exception. If the file is
762      * being closed and there are not enough blocks to complete even a single
763      * leaf block, no leaf blocks get written and this contains the entire
764      * block index. After all levels of the index were written by
765      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
766      * root-level index.
767      */
768     private BlockIndexChunk rootChunk = new BlockIndexChunk();
769 
770     /**
771      * Current leaf-level chunk. New entries referencing data blocks get added
772      * to this chunk until it grows large enough to be written to disk.
773      */
774     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
775 
776     /**
777      * The number of block index levels. This is one if there is only root
778      * level (even empty), two if there a leaf level and root level, and is
779      * higher if there are intermediate levels. This is only final after
780      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
781      * initial value accounts for the root level, and will be increased to two
782      * as soon as we find out there is a leaf-level in
783      * {@link #blockWritten(long, int, int)}.
784      */
785     private int numLevels = 1;
786 
787     private HFileBlock.Writer blockWriter;
788     private byte[] firstKey = null;
789 
790     /**
791      * The total number of leaf-level entries, i.e. entries referenced by
792      * leaf-level blocks. For the data block index this is equal to the number
793      * of data blocks.
794      */
795     private long totalNumEntries;
796 
797     /** Total compressed size of all index blocks. */
798     private long totalBlockOnDiskSize;
799 
800     /** Total uncompressed size of all index blocks. */
801     private long totalBlockUncompressedSize;
802 
803     /** The maximum size guideline of all multi-level index blocks. */
804     private int maxChunkSize;
805 
806     /** The maximum level of multi-level index blocks */
807     private int minIndexNumEntries;
808 
809     /** Whether we require this block index to always be single-level. */
810     private boolean singleLevelOnly;
811 
812     /** CacheConfig, or null if cache-on-write is disabled */
813     private CacheConfig cacheConf;
814 
815     /** Name to use for computing cache keys */
816     private String nameForCaching;
817 
818     /** Creates a single-level block index writer */
819     public BlockIndexWriter() {
820       this(null, null, null);
821       singleLevelOnly = true;
822     }
823 
824     /**
825      * Creates a multi-level block index writer.
826      *
827      * @param blockWriter the block writer to use to write index blocks
828      * @param cacheConf used to determine when and how a block should be cached-on-write.
829      */
830     public BlockIndexWriter(HFileBlock.Writer blockWriter,
831         CacheConfig cacheConf, String nameForCaching) {
832       if ((cacheConf == null) != (nameForCaching == null)) {
833         throw new IllegalArgumentException("Block cache and file name for " +
834             "caching must be both specified or both null");
835       }
836 
837       this.blockWriter = blockWriter;
838       this.cacheConf = cacheConf;
839       this.nameForCaching = nameForCaching;
840       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
841       this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
842     }
843 
844     public void setMaxChunkSize(int maxChunkSize) {
845       if (maxChunkSize <= 0) {
846         throw new IllegalArgumentException("Invalid maximum index block size");
847       }
848       this.maxChunkSize = maxChunkSize;
849     }
850 
851     public void setMinIndexNumEntries(int minIndexNumEntries) {
852       if (minIndexNumEntries <= 1) {
853         throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
854       }
855       this.minIndexNumEntries = minIndexNumEntries;
856     }
857 
858     /**
859      * Writes the root level and intermediate levels of the block index into
860      * the output stream, generating the tree from bottom up. Assumes that the
861      * leaf level has been inline-written to the disk if there is enough data
862      * for more than one leaf block. We iterate by breaking the current level
863      * of the block index, starting with the index of all leaf-level blocks,
864      * into chunks small enough to be written to disk, and generate its parent
865      * level, until we end up with a level small enough to become the root
866      * level.
867      *
868      * If the leaf level is not large enough, there is no inline block index
869      * anymore, so we only write that level of block index to disk as the root
870      * level.
871      *
872      * @param out FSDataOutputStream
873      * @return position at which we entered the root-level index.
874      * @throws IOException
875      */
876     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
877       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
878         throw new IOException("Trying to write a multi-level block index, " +
879             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
880             "last inline chunk.");
881       }
882 
883       // We need to get mid-key metadata before we create intermediate
884       // indexes and overwrite the root chunk.
885       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
886           : null;
887 
888       if (curInlineChunk != null) {
889         while (rootChunk.getRootSize() > maxChunkSize
890             // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
891             && rootChunk.getNumEntries() > minIndexNumEntries
892             // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
893             && numLevels < 16) {
894           rootChunk = writeIntermediateLevel(out, rootChunk);
895           numLevels += 1;
896         }
897       }
898 
899       // write the root level
900       long rootLevelIndexPos = out.getPos();
901 
902       {
903         DataOutput blockStream =
904             blockWriter.startWriting(BlockType.ROOT_INDEX);
905         rootChunk.writeRoot(blockStream);
906         if (midKeyMetadata != null)
907           blockStream.write(midKeyMetadata);
908         blockWriter.writeHeaderAndData(out);
909       }
910 
911       // Add root index block size
912       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
913       totalBlockUncompressedSize +=
914           blockWriter.getUncompressedSizeWithoutHeader();
915 
916       if (LOG.isTraceEnabled()) {
917         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
918           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
919           + " root-level entries, " + totalNumEntries + " total entries, "
920           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
921           " on-disk size, "
922           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
923           " total uncompressed size.");
924       }
925       return rootLevelIndexPos;
926     }
927 
928     /**
929      * Writes the block index data as a single level only. Does not do any
930      * block framing.
931      *
932      * @param out the buffered output stream to write the index to. Typically a
933      *          stream writing into an {@link HFile} block.
934      * @param description a short description of the index being written. Used
935      *          in a log message.
936      * @throws IOException
937      */
938     public void writeSingleLevelIndex(DataOutput out, String description)
939         throws IOException {
940       expectNumLevels(1);
941 
942       if (!singleLevelOnly)
943         throw new IOException("Single-level mode is turned off");
944 
945       if (rootChunk.getNumEntries() > 0)
946         throw new IOException("Root-level entries already added in " +
947             "single-level mode");
948 
949       rootChunk = curInlineChunk;
950       curInlineChunk = new BlockIndexChunk();
951 
952       if (LOG.isTraceEnabled()) {
953         LOG.trace("Wrote a single-level " + description + " index with "
954           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
955           + " bytes");
956       }
957       rootChunk.writeRoot(out);
958     }
959 
960     /**
961      * Split the current level of the block index into intermediate index
962      * blocks of permitted size and write those blocks to disk. Return the next
963      * level of the block index referencing those intermediate-level blocks.
964      *
965      * @param out
966      * @param currentLevel the current level of the block index, such as the a
967      *          chunk referencing all leaf-level index blocks
968      * @return the parent level block index, which becomes the root index after
969      *         a few (usually zero) iterations
970      * @throws IOException
971      */
972     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
973         BlockIndexChunk currentLevel) throws IOException {
974       // Entries referencing intermediate-level blocks we are about to create.
975       BlockIndexChunk parent = new BlockIndexChunk();
976 
977       // The current intermediate-level block index chunk.
978       BlockIndexChunk curChunk = new BlockIndexChunk();
979 
980       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
981         curChunk.add(currentLevel.getBlockKey(i),
982             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
983 
984         // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
985         // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
986         // first key is larger than maxChunkSize this will cause infinite recursion.
987         if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
988           writeIntermediateBlock(out, parent, curChunk);
989         }
990       }
991 
992       if (curChunk.getNumEntries() > 0) {
993         writeIntermediateBlock(out, parent, curChunk);
994       }
995 
996       return parent;
997     }
998 
999     private void writeIntermediateBlock(FSDataOutputStream out,
1000         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1001       long beginOffset = out.getPos();
1002       DataOutputStream dos = blockWriter.startWriting(
1003           BlockType.INTERMEDIATE_INDEX);
1004       curChunk.writeNonRoot(dos);
1005       byte[] curFirstKey = curChunk.getBlockKey(0);
1006       blockWriter.writeHeaderAndData(out);
1007 
1008       if (cacheConf != null) {
1009         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1010         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1011           beginOffset), blockForCaching);
1012       }
1013 
1014       // Add intermediate index block size
1015       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1016       totalBlockUncompressedSize +=
1017           blockWriter.getUncompressedSizeWithoutHeader();
1018 
1019       // OFFSET is the beginning offset the chunk of block index entries.
1020       // SIZE is the total byte size of the chunk of block index entries
1021       // + the secondary index size
1022       // FIRST_KEY is the first key in the chunk of block index
1023       // entries.
1024       parent.add(curFirstKey, beginOffset,
1025           blockWriter.getOnDiskSizeWithHeader());
1026 
1027       // clear current block index chunk
1028       curChunk.clear();
1029       curFirstKey = null;
1030     }
1031 
1032     /**
1033      * @return how many block index entries there are in the root level
1034      */
1035     public final int getNumRootEntries() {
1036       return rootChunk.getNumEntries();
1037     }
1038 
1039     /**
1040      * @return the number of levels in this block index.
1041      */
1042     public int getNumLevels() {
1043       return numLevels;
1044     }
1045 
1046     private void expectNumLevels(int expectedNumLevels) {
1047       if (numLevels != expectedNumLevels) {
1048         throw new IllegalStateException("Number of block index levels is "
1049             + numLevels + "but is expected to be " + expectedNumLevels);
1050       }
1051     }
1052 
1053     /**
1054      * Whether there is an inline block ready to be written. In general, we
1055      * write an leaf-level index block as an inline block as soon as its size
1056      * as serialized in the non-root format reaches a certain threshold.
1057      */
1058     @Override
1059     public boolean shouldWriteBlock(boolean closing) {
1060       if (singleLevelOnly) {
1061         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1062       }
1063 
1064       if (curInlineChunk == null) {
1065         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1066             "called with closing=true and then called again?");
1067       }
1068 
1069       if (curInlineChunk.getNumEntries() == 0) {
1070         return false;
1071       }
1072 
1073       // We do have some entries in the current inline chunk.
1074       if (closing) {
1075         if (rootChunk.getNumEntries() == 0) {
1076           // We did not add any leaf-level blocks yet. Instead of creating a
1077           // leaf level with one block, move these entries to the root level.
1078 
1079           expectNumLevels(1);
1080           rootChunk = curInlineChunk;
1081           curInlineChunk = null;  // Disallow adding any more index entries.
1082           return false;
1083         }
1084 
1085         return true;
1086       } else {
1087         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1088       }
1089     }
1090 
1091     /**
1092      * Write out the current inline index block. Inline blocks are non-root
1093      * blocks, so the non-root index format is used.
1094      *
1095      * @param out
1096      */
1097     @Override
1098     public void writeInlineBlock(DataOutput out) throws IOException {
1099       if (singleLevelOnly)
1100         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1101 
1102       // Write the inline block index to the output stream in the non-root
1103       // index block format.
1104       curInlineChunk.writeNonRoot(out);
1105 
1106       // Save the first key of the inline block so that we can add it to the
1107       // parent-level index.
1108       firstKey = curInlineChunk.getBlockKey(0);
1109 
1110       // Start a new inline index block
1111       curInlineChunk.clear();
1112     }
1113 
1114     /**
1115      * Called after an inline block has been written so that we can add an
1116      * entry referring to that block to the parent-level index.
1117      */
1118     @Override
1119     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1120       // Add leaf index block size
1121       totalBlockOnDiskSize += onDiskSize;
1122       totalBlockUncompressedSize += uncompressedSize;
1123 
1124       if (singleLevelOnly)
1125         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1126 
1127       if (firstKey == null) {
1128         throw new IllegalStateException("Trying to add second-level index " +
1129             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1130             "but the first key was not set in writeInlineBlock");
1131       }
1132 
1133       if (rootChunk.getNumEntries() == 0) {
1134         // We are writing the first leaf block, so increase index level.
1135         expectNumLevels(1);
1136         numLevels = 2;
1137       }
1138 
1139       // Add another entry to the second-level index. Include the number of
1140       // entries in all previous leaf-level chunks for mid-key calculation.
1141       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1142       firstKey = null;
1143     }
1144 
1145     @Override
1146     public BlockType getInlineBlockType() {
1147       return BlockType.LEAF_INDEX;
1148     }
1149 
1150     /**
1151      * Add one index entry to the current leaf-level block. When the leaf-level
1152      * block gets large enough, it will be flushed to disk as an inline block.
1153      *
1154      * @param firstKey the first key of the data block
1155      * @param blockOffset the offset of the data block
1156      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1157      *          format version 2), or the uncompressed size of the data block (
1158      *          {@link HFile} format version 1).
1159      */
1160     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1161       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1162       ++totalNumEntries;
1163     }
1164 
1165     /**
1166      * @throws IOException if we happened to write a multi-level index.
1167      */
1168     public void ensureSingleLevel() throws IOException {
1169       if (numLevels > 1) {
1170         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1171             rootChunk.getNumEntries() + " root-level entries, but " +
1172             "this is expected to be a single-level block index.");
1173       }
1174     }
1175 
1176     /**
1177      * @return true if we are using cache-on-write. This is configured by the
1178      *         caller of the constructor by either passing a valid block cache
1179      *         or null.
1180      */
1181     @Override
1182     public boolean getCacheOnWrite() {
1183       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1184     }
1185 
1186     /**
1187      * The total uncompressed size of the root index block, intermediate-level
1188      * index blocks, and leaf-level index blocks.
1189      *
1190      * @return the total uncompressed size of all index blocks
1191      */
1192     public long getTotalUncompressedSize() {
1193       return totalBlockUncompressedSize;
1194     }
1195 
1196   }
1197 
1198   /**
1199    * A single chunk of the block index in the process of writing. The data in
1200    * this chunk can become a leaf-level, intermediate-level, or root index
1201    * block.
1202    */
1203   static class BlockIndexChunk {
1204 
1205     /** First keys of the key range corresponding to each index entry. */
1206     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1207 
1208     /** Block offset in backing stream. */
1209     private final List<Long> blockOffsets = new ArrayList<Long>();
1210 
1211     /** On-disk data sizes of lower-level data or index blocks. */
1212     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1213 
1214     /**
1215      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1216      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1217      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1218      */
1219     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1220 
1221     /**
1222      * The offset of the next entry to be added, relative to the end of the
1223      * "secondary index" in the "non-root" format representation of this index
1224      * chunk. This is the next value to be added to the secondary index.
1225      */
1226     private int curTotalNonRootEntrySize = 0;
1227 
1228     /**
1229      * The accumulated size of this chunk if stored in the root index format.
1230      */
1231     private int curTotalRootSize = 0;
1232 
1233     /**
1234      * The "secondary index" used for binary search over variable-length
1235      * records in a "non-root" format block. These offsets are relative to the
1236      * end of this secondary index.
1237      */
1238     private final List<Integer> secondaryIndexOffsetMarks =
1239         new ArrayList<Integer>();
1240 
1241     /**
1242      * Adds a new entry to this block index chunk.
1243      *
1244      * @param firstKey the first key in the block pointed to by this entry
1245      * @param blockOffset the offset of the next-level block pointed to by this
1246      *          entry
1247      * @param onDiskDataSize the on-disk data of the block pointed to by this
1248      *          entry, including header size
1249      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1250      *          construction, this specifies the current total number of
1251      *          sub-entries in all leaf-level chunks, including the one
1252      *          corresponding to the second-level entry being added.
1253      */
1254     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1255         long curTotalNumSubEntries) {
1256       // Record the offset for the secondary index
1257       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1258       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1259           + firstKey.length;
1260 
1261       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1262           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1263 
1264       blockKeys.add(firstKey);
1265       blockOffsets.add(blockOffset);
1266       onDiskDataSizes.add(onDiskDataSize);
1267 
1268       if (curTotalNumSubEntries != -1) {
1269         numSubEntriesAt.add(curTotalNumSubEntries);
1270 
1271         // Make sure the parallel arrays are in sync.
1272         if (numSubEntriesAt.size() != blockKeys.size()) {
1273           throw new IllegalStateException("Only have key/value count " +
1274               "stats for " + numSubEntriesAt.size() + " block index " +
1275               "entries out of " + blockKeys.size());
1276         }
1277       }
1278     }
1279 
1280     /**
1281      * The same as {@link #add(byte[], long, int, long)} but does not take the
1282      * key/value into account. Used for single-level indexes.
1283      *
1284      * @see {@link #add(byte[], long, int, long)}
1285      */
1286     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1287       add(firstKey, blockOffset, onDiskDataSize, -1);
1288     }
1289 
1290     public void clear() {
1291       blockKeys.clear();
1292       blockOffsets.clear();
1293       onDiskDataSizes.clear();
1294       secondaryIndexOffsetMarks.clear();
1295       numSubEntriesAt.clear();
1296       curTotalNonRootEntrySize = 0;
1297       curTotalRootSize = 0;
1298     }
1299 
1300     /**
1301      * Finds the entry corresponding to the deeper-level index block containing
1302      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1303      * ordering of sub-entries.
1304      *
1305      * <p>
1306      * <i> Implementation note. </i> We are looking for i such that
1307      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1308      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1309      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1310      * sub-entries. i is by definition the insertion point of k in
1311      * numSubEntriesAt.
1312      *
1313      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1314      * @return the 0-based index of the entry corresponding to the given
1315      *         sub-entry
1316      */
1317     public int getEntryBySubEntry(long k) {
1318       // We define mid-key as the key corresponding to k'th sub-entry
1319       // (0-based).
1320 
1321       int i = Collections.binarySearch(numSubEntriesAt, k);
1322 
1323       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1324       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1325       // is in the (i + 1)'th chunk.
1326       if (i >= 0)
1327         return i + 1;
1328 
1329       // Inexact match. Return the insertion point.
1330       return -i - 1;
1331     }
1332 
1333     /**
1334      * Used when writing the root block index of a multi-level block index.
1335      * Serializes additional information allowing to efficiently identify the
1336      * mid-key.
1337      *
1338      * @return a few serialized fields for finding the mid-key
1339      * @throws IOException if could not create metadata for computing mid-key
1340      */
1341     public byte[] getMidKeyMetadata() throws IOException {
1342       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1343           MID_KEY_METADATA_SIZE);
1344       DataOutputStream baosDos = new DataOutputStream(baos);
1345       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1346       if (totalNumSubEntries == 0) {
1347         throw new IOException("No leaf-level entries, mid-key unavailable");
1348       }
1349       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1350       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1351 
1352       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1353       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1354 
1355       long numSubEntriesBefore = midKeyEntry > 0
1356           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1357       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1358       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1359       {
1360         throw new IOException("Could not identify mid-key index within the "
1361             + "leaf-level block containing mid-key: out of range ("
1362             + subEntryWithinEntry + ", numSubEntriesBefore="
1363             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1364             + ")");
1365       }
1366 
1367       baosDos.writeInt((int) subEntryWithinEntry);
1368 
1369       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1370         throw new IOException("Could not write mid-key metadata: size=" +
1371             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1372       }
1373 
1374       // Close just to be good citizens, although this has no effect.
1375       baos.close();
1376 
1377       return baos.toByteArray();
1378     }
1379 
1380     /**
1381      * Writes the block index chunk in the non-root index block format. This
1382      * format contains the number of entries, an index of integer offsets
1383      * for quick binary search on variable-length records, and tuples of
1384      * block offset, on-disk block size, and the first key for each entry.
1385      *
1386      * @param out
1387      * @throws IOException
1388      */
1389     void writeNonRoot(DataOutput out) throws IOException {
1390       // The number of entries in the block.
1391       out.writeInt(blockKeys.size());
1392 
1393       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1394         throw new IOException("Corrupted block index chunk writer: " +
1395             blockKeys.size() + " entries but " +
1396             secondaryIndexOffsetMarks.size() + " secondary index items");
1397       }
1398 
1399       // For each entry, write a "secondary index" of relative offsets to the
1400       // entries from the end of the secondary index. This works, because at
1401       // read time we read the number of entries and know where the secondary
1402       // index ends.
1403       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1404         out.writeInt(currentSecondaryIndex);
1405 
1406       // We include one other element in the secondary index to calculate the
1407       // size of each entry more easily by subtracting secondary index elements.
1408       out.writeInt(curTotalNonRootEntrySize);
1409 
1410       for (int i = 0; i < blockKeys.size(); ++i) {
1411         out.writeLong(blockOffsets.get(i));
1412         out.writeInt(onDiskDataSizes.get(i));
1413         out.write(blockKeys.get(i));
1414       }
1415     }
1416 
1417     /**
1418      * @return the size of this chunk if stored in the non-root index block
1419      *         format
1420      */
1421     int getNonRootSize() {
1422       return Bytes.SIZEOF_INT                          // Number of entries
1423           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1424           + curTotalNonRootEntrySize;                  // All entries
1425     }
1426 
1427     /**
1428      * Writes this chunk into the given output stream in the root block index
1429      * format. This format is similar to the {@link HFile} version 1 block
1430      * index format, except that we store on-disk size of the block instead of
1431      * its uncompressed size.
1432      *
1433      * @param out the data output stream to write the block index to. Typically
1434      *          a stream writing into an {@link HFile} block.
1435      * @throws IOException
1436      */
1437     void writeRoot(DataOutput out) throws IOException {
1438       for (int i = 0; i < blockKeys.size(); ++i) {
1439         out.writeLong(blockOffsets.get(i));
1440         out.writeInt(onDiskDataSizes.get(i));
1441         Bytes.writeByteArray(out, blockKeys.get(i));
1442       }
1443     }
1444 
1445     /**
1446      * @return the size of this chunk if stored in the root index block format
1447      */
1448     int getRootSize() {
1449       return curTotalRootSize;
1450     }
1451 
1452     /**
1453      * @return the number of entries in this block index chunk
1454      */
1455     public int getNumEntries() {
1456       return blockKeys.size();
1457     }
1458 
1459     public byte[] getBlockKey(int i) {
1460       return blockKeys.get(i);
1461     }
1462 
1463     public long getBlockOffset(int i) {
1464       return blockOffsets.get(i);
1465     }
1466 
1467     public int getOnDiskDataSize(int i) {
1468       return onDiskDataSizes.get(i);
1469     }
1470 
1471     public long getCumulativeNumKV(int i) {
1472       if (i < 0)
1473         return 0;
1474       return numSubEntriesAt.get(i);
1475     }
1476 
1477   }
1478 
1479   public static int getMaxChunkSize(Configuration conf) {
1480     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1481   }
1482 
1483   public static int getMinIndexNumEntries(Configuration conf) {
1484     return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1485   }
1486 }