View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValue.KVComparator;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.io.HeapSize;
44  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
46  import org.apache.hadoop.hbase.util.ByteBufferUtils;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.io.WritableUtils;
50  import org.apache.hadoop.util.StringUtils;
51  
52  /**
53   * Provides functionality to write ({@link BlockIndexWriter}) and read
54   * ({@link BlockIndexReader}) single-level and multi-level block indexes.
55   *
56   * Examples of how to use the block index writer can be found in
57   * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
58   *  {@link HFileWriterV2}. Examples of how to use the reader can be
59   *  found in {@link HFileReaderV2} and TestHFileBlockIndex.
60   */
61  @InterfaceAudience.Private
62  public class HFileBlockIndex {
63  
64    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
65  
66    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
67  
68    /**
69     * The maximum size guideline for index blocks (both leaf, intermediate, and
70     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
71     */
72    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
73  
74    /**
75     * The number of bytes stored in each "secondary index" entry in addition to
76     * key bytes in the non-root index block format. The first long is the file
77     * offset of the deeper-level block the entry points to, and the int that
78     * follows is that block's on-disk size without including header.
79     */
80    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
81        + Bytes.SIZEOF_LONG;
82  
83    /**
84     * Error message when trying to use inline block API in single-level mode.
85     */
86    private static final String INLINE_BLOCKS_NOT_ALLOWED =
87        "Inline blocks are not allowed in the single-level-only mode";
88  
89    /**
90     * The size of a meta-data record used for finding the mid-key in a
91     * multi-level index. Consists of the middle leaf-level index block offset
92     * (long), its on-disk size without header included (int), and the mid-key
93     * entry's zero-based index in that leaf index block.
94     */
95    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
96        2 * Bytes.SIZEOF_INT;
97  
98    /**
99     * The reader will always hold the root level index in the memory. Index
100    * blocks at all other levels will be cached in the LRU cache in practice,
101    * although this API does not enforce that.
102    *
103    * All non-root (leaf and intermediate) index blocks contain what we call a
104    * "secondary index": an array of offsets to the entries within the block.
105    * This allows us to do binary search for the entry corresponding to the
106    * given key without having to deserialize the block.
107    */
108   public static class BlockIndexReader implements HeapSize {
109     /** Needed doing lookup on blocks. */
110     private final KVComparator comparator;
111 
112     // Root-level data.
113     private byte[][] blockKeys;
114     private long[] blockOffsets;
115     private int[] blockDataSizes;
116     private int rootCount = 0;
117 
118     // Mid-key metadata.
119     private long midLeafBlockOffset = -1;
120     private int midLeafBlockOnDiskSize = -1;
121     private int midKeyEntry = -1;
122 
123     /** Pre-computed mid-key */
124     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
125 
126     /**
127      * The number of levels in the block index tree. One if there is only root
128      * level, two for root and leaf levels, etc.
129      */
130     private int searchTreeLevel;
131 
132     /** A way to read {@link HFile} blocks at a given offset */
133     private CachingBlockReader cachingBlockReader;
134 
135     public BlockIndexReader(final KVComparator c, final int treeLevel,
136         final CachingBlockReader cachingBlockReader) {
137       this(c, treeLevel);
138       this.cachingBlockReader = cachingBlockReader;
139     }
140 
141     public BlockIndexReader(final KVComparator c, final int treeLevel)
142     {
143       comparator = c;
144       searchTreeLevel = treeLevel;
145     }
146 
147     /**
148      * @return true if the block index is empty.
149      */
150     public boolean isEmpty() {
151       return blockKeys.length == 0;
152     }
153 
154     /**
155      * Verifies that the block index is non-empty and throws an
156      * {@link IllegalStateException} otherwise.
157      */
158     public void ensureNonEmpty() {
159       if (blockKeys.length == 0) {
160         throw new IllegalStateException("Block index is empty or not loaded");
161       }
162     }
163 
164     /**
165      * Return the data block which contains this key. This function will only
166      * be called when the HFile version is larger than 1.
167      *
168      * @param key the key we are looking for
169      * @param currentBlock the current block, to avoid re-reading the same block
170      * @param cacheBlocks
171      * @param pread
172      * @param isCompaction
173      * @param expectedDataBlockEncoding the data block encoding the caller is
174      *          expecting the data block to be in, or null to not perform this
175      *          check and return the block irrespective of the encoding
176      * @return reader a basic way to load blocks
177      * @throws IOException
178      */
179     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
180         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
181         throws IOException {
182       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
183           cacheBlocks,
184           pread, isCompaction, expectedDataBlockEncoding);
185       if (blockWithScanInfo == null) {
186         return null;
187       } else {
188         return blockWithScanInfo.getHFileBlock();
189       }
190     }
191 
192     /**
193      * Return the BlockWithScanInfo which contains the DataBlock with other scan
194      * info such as nextIndexedKey. This function will only be called when the
195      * HFile version is larger than 1.
196      * 
197      * @param key
198      *          the key we are looking for
199      * @param currentBlock
200      *          the current block, to avoid re-reading the same block
201      * @param cacheBlocks
202      * @param pread
203      * @param isCompaction
204      * @param expectedDataBlockEncoding the data block encoding the caller is
205      *          expecting the data block to be in, or null to not perform this
206      *          check and return the block irrespective of the encoding.
207      * @return the BlockWithScanInfo which contains the DataBlock with other
208      *         scan info such as nextIndexedKey.
209      * @throws IOException
210      */
211     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
212         boolean cacheBlocks,
213         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
214         throws IOException {
215       int rootLevelIndex = rootBlockContainingKey(key);
216       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
217         return null;
218       }
219 
220       // the next indexed key
221       byte[] nextIndexedKey = null;
222 
223       // Read the next-level (intermediate or leaf) index block.
224       long currentOffset = blockOffsets[rootLevelIndex];
225       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
226 
227       if (rootLevelIndex < blockKeys.length - 1) {
228         nextIndexedKey = blockKeys[rootLevelIndex + 1];
229       } else {
230         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
231       }
232 
233       int lookupLevel = 1; // How many levels deep we are in our lookup.
234       int index = -1;
235 
236       HFileBlock block;
237       while (true) {
238 
239         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
240         {
241           // Avoid reading the same block again, even with caching turned off.
242           // This is crucial for compaction-type workload which might have
243           // caching turned off. This is like a one-block cache inside the
244           // scanner.
245           block = currentBlock;
246         } else {
247           // Call HFile's caching block reader API. We always cache index
248           // blocks, otherwise we might get terrible performance.
249           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
250           BlockType expectedBlockType;
251           if (lookupLevel < searchTreeLevel - 1) {
252             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
253           } else if (lookupLevel == searchTreeLevel - 1) {
254             expectedBlockType = BlockType.LEAF_INDEX;
255           } else {
256             // this also accounts for ENCODED_DATA
257             expectedBlockType = BlockType.DATA;
258           }
259           block = cachingBlockReader.readBlock(currentOffset,
260           currentOnDiskSize, shouldCache, pread, isCompaction, true,
261               expectedBlockType, expectedDataBlockEncoding);
262         }
263 
264         if (block == null) {
265           throw new IOException("Failed to read block at offset " +
266               currentOffset + ", onDiskSize=" + currentOnDiskSize);
267         }
268 
269         // Found a data block, break the loop and check our level in the tree.
270         if (block.getBlockType().isData()) {
271           break;
272         }
273 
274         // Not a data block. This must be a leaf-level or intermediate-level
275         // index block. We don't allow going deeper than searchTreeLevel.
276         if (++lookupLevel > searchTreeLevel) {
277           throw new IOException("Search Tree Level overflow: lookupLevel="+
278               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
279         }
280 
281         // Locate the entry corresponding to the given key in the non-root
282         // (leaf or intermediate-level) index block.
283         ByteBuffer buffer = block.getBufferWithoutHeader();
284         index = locateNonRootIndexEntry(buffer, key, comparator);
285         if (index == -1) {
286           // This has to be changed
287           // For now change this to key value
288           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
289           throw new IOException("The key "
290               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
291               + " is before the" + " first key of the non-root index block "
292               + block);
293         }
294 
295         currentOffset = buffer.getLong();
296         currentOnDiskSize = buffer.getInt();
297 
298         // Only update next indexed key if there is a next indexed key in the current level
299         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
300         if (tmpNextIndexedKey != null) {
301           nextIndexedKey = tmpNextIndexedKey;
302         }
303       }
304 
305       if (lookupLevel != searchTreeLevel) {
306         throw new IOException("Reached a data block at level " + lookupLevel +
307             " but the number of levels is " + searchTreeLevel);
308       }
309 
310       // set the next indexed key for the current block.
311       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
312       return blockWithScanInfo;
313     }
314 
315     /**
316      * An approximation to the {@link HFile}'s mid-key. Operates on block
317      * boundaries, and does not go inside blocks. In other words, returns the
318      * first key of the middle block of the file.
319      *
320      * @return the first key of the middle block
321      */
322     public byte[] midkey() throws IOException {
323       if (rootCount == 0)
324         throw new IOException("HFile empty");
325 
326       byte[] targetMidKey = this.midKey.get();
327       if (targetMidKey != null) {
328         return targetMidKey;
329       }
330 
331       if (midLeafBlockOffset >= 0) {
332         if (cachingBlockReader == null) {
333           throw new IOException("Have to read the middle leaf block but " +
334               "no block reader available");
335         }
336 
337         // Caching, using pread, assuming this is not a compaction.
338         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
339             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
340             BlockType.LEAF_INDEX, null);
341 
342         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
343         int numDataBlocks = b.getInt();
344         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
345         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
346             keyRelOffset;
347         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
348             + SECONDARY_INDEX_ENTRY_OVERHEAD;
349         targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
350       } else {
351         // The middle of the root-level index.
352         targetMidKey = blockKeys[rootCount / 2];
353       }
354 
355       this.midKey.set(targetMidKey);
356       return targetMidKey;
357     }
358 
359     /**
360      * @param i from 0 to {@link #getRootBlockCount() - 1}
361      */
362     public byte[] getRootBlockKey(int i) {
363       return blockKeys[i];
364     }
365 
366     /**
367      * @param i from 0 to {@link #getRootBlockCount() - 1}
368      */
369     public long getRootBlockOffset(int i) {
370       return blockOffsets[i];
371     }
372 
373     /**
374      * @param i zero-based index of a root-level block
375      * @return the on-disk size of the root-level block for version 2, or the
376      *         uncompressed size for version 1
377      */
378     public int getRootBlockDataSize(int i) {
379       return blockDataSizes[i];
380     }
381 
382     /**
383      * @return the number of root-level blocks in this block index
384      */
385     public int getRootBlockCount() {
386       return rootCount;
387     }
388 
389     /**
390      * Finds the root-level index block containing the given key.
391      *
392      * @param key
393      *          Key to find
394      * @return Offset of block containing <code>key</code> (between 0 and the
395      *         number of blocks - 1) or -1 if this file does not contain the
396      *         request.
397      */
398     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
399       int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
400       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
401       // binarySearch's javadoc.
402 
403       if (pos >= 0) {
404         // This means this is an exact match with an element of blockKeys.
405         assert pos < blockKeys.length;
406         return pos;
407       }
408 
409       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
410       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
411       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
412       // key < blockKeys[0], meaning the file does not contain the given key.
413 
414       int i = -pos - 1;
415       assert 0 <= i && i <= blockKeys.length;
416       return i - 1;
417     }
418 
419     /**
420      * Finds the root-level index block containing the given key.
421      *
422      * @param key
423      *          Key to find
424      */
425     public int rootBlockContainingKey(final Cell key) {
426       int pos = Bytes.binarySearch(blockKeys, key, comparator);
427       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
428       // binarySearch's javadoc.
429 
430       if (pos >= 0) {
431         // This means this is an exact match with an element of blockKeys.
432         assert pos < blockKeys.length;
433         return pos;
434       }
435 
436       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
437       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
438       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
439       // key < blockKeys[0], meaning the file does not contain the given key.
440 
441       int i = -pos - 1;
442       assert 0 <= i && i <= blockKeys.length;
443       return i - 1;
444     }
445 
446     /**
447      * Adds a new entry in the root block index. Only used when reading.
448      *
449      * @param key Last key in the block
450      * @param offset file offset where the block is stored
451      * @param dataSize the uncompressed data size
452      */
453     private void add(final byte[] key, final long offset, final int dataSize) {
454       blockOffsets[rootCount] = offset;
455       blockKeys[rootCount] = key;
456       blockDataSizes[rootCount] = dataSize;
457       rootCount++;
458     }
459 
460     /**
461      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
462      * @param nonRootIndex
463      * @param i the ith position
464      * @return The indexed key at the ith position in the nonRootIndex.
465      */
466     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
467       int numEntries = nonRootIndex.getInt(0);
468       if (i < 0 || i >= numEntries) {
469         return null;
470       }
471 
472       // Entries start after the number of entries and the secondary index.
473       // The secondary index takes numEntries + 1 ints.
474       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
475       // Targetkey's offset relative to the end of secondary index
476       int targetKeyRelOffset = nonRootIndex.getInt(
477           Bytes.SIZEOF_INT * (i + 1));
478 
479       // The offset of the target key in the blockIndex buffer
480       int targetKeyOffset = entriesOffset     // Skip secondary index
481           + targetKeyRelOffset               // Skip all entries until mid
482           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
483 
484       // We subtract the two consecutive secondary index elements, which
485       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
486       // then need to subtract the overhead of offset and onDiskSize.
487       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
488         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
489 
490       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
491     }
492 
493     /**
494      * Performs a binary search over a non-root level index block. Utilizes the
495      * secondary index, which records the offsets of (offset, onDiskSize,
496      * firstKey) tuples of all entries.
497      * 
498      * @param key
499      *          the key we are searching for offsets to individual entries in
500      *          the blockIndex buffer
501      * @param nonRootIndex
502      *          the non-root index block buffer, starting with the secondary
503      *          index. The position is ignored.
504      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
505      *         keys[i + 1], if keys is the array of all keys being searched, or
506      *         -1 otherwise
507      * @throws IOException
508      */
509     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
510         KVComparator comparator) {
511 
512       int numEntries = nonRootIndex.getInt(0);
513       int low = 0;
514       int high = numEntries - 1;
515       int mid = 0;
516 
517       // Entries start after the number of entries and the secondary index.
518       // The secondary index takes numEntries + 1 ints.
519       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
520 
521       // If we imagine that keys[-1] = -Infinity and
522       // keys[numEntries] = Infinity, then we are maintaining an invariant that
523       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
524       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
525       while (low <= high) {
526         mid = (low + high) >>> 1;
527 
528         // Midkey's offset relative to the end of secondary index
529         int midKeyRelOffset = nonRootIndex.getInt(
530             Bytes.SIZEOF_INT * (mid + 1));
531 
532         // The offset of the middle key in the blockIndex buffer
533         int midKeyOffset = entriesOffset       // Skip secondary index
534             + midKeyRelOffset                  // Skip all entries until mid
535             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
536 
537         // We subtract the two consecutive secondary index elements, which
538         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
539         // then need to subtract the overhead of offset and onDiskSize.
540         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
541             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
542 
543         // we have to compare in this order, because the comparator order
544         // has special logic when the 'left side' is a special key.
545         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
546         // done after HBASE-12224 & HBASE-12282
547         nonRootIndexKV.setKey(nonRootIndex.array(),
548             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
549         int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
550 
551         // key lives above the midpoint
552         if (cmp > 0)
553           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
554         // key lives below the midpoint
555         else if (cmp < 0)
556           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
557         else
558           return mid; // exact match
559       }
560 
561       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
562       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
563       // condition, low >= high + 1. Therefore, low = high + 1.
564 
565       if (low != high + 1) {
566         throw new IllegalStateException("Binary search broken: low=" + low
567             + " " + "instead of " + (high + 1));
568       }
569 
570       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
571       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
572       int i = low - 1;
573 
574       // Some extra validation on the result.
575       if (i < -1 || i >= numEntries) {
576         throw new IllegalStateException("Binary search broken: result is " +
577             i + " but expected to be between -1 and (numEntries - 1) = " +
578             (numEntries - 1));
579       }
580 
581       return i;
582     }
583 
584     /**
585      * Search for one key using the secondary index in a non-root block. In case
586      * of success, positions the provided buffer at the entry of interest, where
587      * the file offset and the on-disk-size can be read.
588      *
589      * @param nonRootBlock
590      *          a non-root block without header. Initial position does not
591      *          matter.
592      * @param key
593      *          the byte array containing the key
594      * @return the index position where the given key was found, otherwise
595      *         return -1 in the case the given key is before the first key.
596      *
597      */
598     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
599         KVComparator comparator) {
600       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
601 
602       if (entryIndex != -1) {
603         int numEntries = nonRootBlock.getInt(0);
604 
605         // The end of secondary index and the beginning of entries themselves.
606         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
607 
608         // The offset of the entry we are interested in relative to the end of
609         // the secondary index.
610         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
611 
612         nonRootBlock.position(entriesOffset + entryRelOffset);
613       }
614 
615       return entryIndex;
616     }
617 
618     /**
619      * Read in the root-level index from the given input stream. Must match
620      * what was written into the root level by
621      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
622      * offset that function returned.
623      *
624      * @param in the buffered input stream or wrapped byte input stream
625      * @param numEntries the number of root-level index entries
626      * @throws IOException
627      */
628     public void readRootIndex(DataInput in, final int numEntries)
629         throws IOException {
630       blockOffsets = new long[numEntries];
631       blockKeys = new byte[numEntries][];
632       blockDataSizes = new int[numEntries];
633 
634       // If index size is zero, no index was written.
635       if (numEntries > 0) {
636         for (int i = 0; i < numEntries; ++i) {
637           long offset = in.readLong();
638           int dataSize = in.readInt();
639           byte[] key = Bytes.readByteArray(in);
640           add(key, offset, dataSize);
641         }
642       }
643     }
644     
645     /**
646      * Read in the root-level index from the given input stream. Must match
647      * what was written into the root level by
648      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
649      * offset that function returned.
650      *
651      * @param blk the HFile block
652      * @param numEntries the number of root-level index entries
653      * @return the buffered input stream or wrapped byte input stream
654      * @throws IOException
655      */
656     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
657       DataInputStream in = blk.getByteStream();
658       readRootIndex(in, numEntries);
659       return in;
660     }
661 
662     /**
663      * Read the root-level metadata of a multi-level block index. Based on
664      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
665      * necessary to compute the mid-key in a multi-level index.
666      *
667      * @param blk the HFile block
668      * @param numEntries the number of root-level index entries
669      * @throws IOException
670      */
671     public void readMultiLevelIndexRoot(HFileBlock blk,
672         final int numEntries) throws IOException {
673       DataInputStream in = readRootIndex(blk, numEntries);
674       // after reading the root index the checksum bytes have to
675       // be subtracted to know if the mid key exists.
676       int checkSumBytes = blk.totalChecksumBytes();
677       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
678         // No mid-key metadata available.
679         return;
680       }
681       midLeafBlockOffset = in.readLong();
682       midLeafBlockOnDiskSize = in.readInt();
683       midKeyEntry = in.readInt();
684     }
685 
686     @Override
687     public String toString() {
688       StringBuilder sb = new StringBuilder();
689       sb.append("size=" + rootCount).append("\n");
690       for (int i = 0; i < rootCount; i++) {
691         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
692             .append("\n  offset=").append(blockOffsets[i])
693             .append(", dataSize=" + blockDataSizes[i]).append("\n");
694       }
695       return sb.toString();
696     }
697 
698     @Override
699     public long heapSize() {
700       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
701           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
702 
703       // Mid-key metadata.
704       heapSize += MID_KEY_METADATA_SIZE;
705 
706       // Calculating the size of blockKeys
707       if (blockKeys != null) {
708         // Adding array + references overhead
709         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
710             * ClassSize.REFERENCE);
711 
712         // Adding bytes
713         for (byte[] key : blockKeys) {
714           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
715         }
716       }
717 
718       if (blockOffsets != null) {
719         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
720             * Bytes.SIZEOF_LONG);
721       }
722 
723       if (blockDataSizes != null) {
724         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
725             * Bytes.SIZEOF_INT);
726       }
727 
728       return ClassSize.align(heapSize);
729     }
730 
731   }
732 
733   /**
734    * Writes the block index into the output stream. Generate the tree from
735    * bottom up. The leaf level is written to disk as a sequence of inline
736    * blocks, if it is larger than a certain number of bytes. If the leaf level
737    * is not large enough, we write all entries to the root level instead.
738    *
739    * After all leaf blocks have been written, we end up with an index
740    * referencing the resulting leaf index blocks. If that index is larger than
741    * the allowed root index size, the writer will break it up into
742    * reasonable-size intermediate-level index block chunks write those chunks
743    * out, and create another index referencing those chunks. This will be
744    * repeated until the remaining index is small enough to become the root
745    * index. However, in most practical cases we will only have leaf-level
746    * blocks and the root index, or just the root index.
747    */
748   public static class BlockIndexWriter implements InlineBlockWriter {
749     /**
750      * While the index is being written, this represents the current block
751      * index referencing all leaf blocks, with one exception. If the file is
752      * being closed and there are not enough blocks to complete even a single
753      * leaf block, no leaf blocks get written and this contains the entire
754      * block index. After all levels of the index were written by
755      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
756      * root-level index.
757      */
758     private BlockIndexChunk rootChunk = new BlockIndexChunk();
759 
760     /**
761      * Current leaf-level chunk. New entries referencing data blocks get added
762      * to this chunk until it grows large enough to be written to disk.
763      */
764     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
765 
766     /**
767      * The number of block index levels. This is one if there is only root
768      * level (even empty), two if there a leaf level and root level, and is
769      * higher if there are intermediate levels. This is only final after
770      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
771      * initial value accounts for the root level, and will be increased to two
772      * as soon as we find out there is a leaf-level in
773      * {@link #blockWritten(long, int, int)}.
774      */
775     private int numLevels = 1;
776 
777     private HFileBlock.Writer blockWriter;
778     private byte[] firstKey = null;
779 
780     /**
781      * The total number of leaf-level entries, i.e. entries referenced by
782      * leaf-level blocks. For the data block index this is equal to the number
783      * of data blocks.
784      */
785     private long totalNumEntries;
786 
787     /** Total compressed size of all index blocks. */
788     private long totalBlockOnDiskSize;
789 
790     /** Total uncompressed size of all index blocks. */
791     private long totalBlockUncompressedSize;
792 
793     /** The maximum size guideline of all multi-level index blocks. */
794     private int maxChunkSize;
795 
796     /** Whether we require this block index to always be single-level. */
797     private boolean singleLevelOnly;
798 
799     /** CacheConfig, or null if cache-on-write is disabled */
800     private CacheConfig cacheConf;
801 
802     /** Name to use for computing cache keys */
803     private String nameForCaching;
804 
805     /** Creates a single-level block index writer */
806     public BlockIndexWriter() {
807       this(null, null, null);
808       singleLevelOnly = true;
809     }
810 
811     /**
812      * Creates a multi-level block index writer.
813      *
814      * @param blockWriter the block writer to use to write index blocks
815      * @param cacheConf used to determine when and how a block should be cached-on-write.
816      */
817     public BlockIndexWriter(HFileBlock.Writer blockWriter,
818         CacheConfig cacheConf, String nameForCaching) {
819       if ((cacheConf == null) != (nameForCaching == null)) {
820         throw new IllegalArgumentException("Block cache and file name for " +
821             "caching must be both specified or both null");
822       }
823 
824       this.blockWriter = blockWriter;
825       this.cacheConf = cacheConf;
826       this.nameForCaching = nameForCaching;
827       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
828     }
829 
830     public void setMaxChunkSize(int maxChunkSize) {
831       if (maxChunkSize <= 0) {
832         throw new IllegalArgumentException("Invald maximum index block size");
833       }
834       this.maxChunkSize = maxChunkSize;
835     }
836 
837     /**
838      * Writes the root level and intermediate levels of the block index into
839      * the output stream, generating the tree from bottom up. Assumes that the
840      * leaf level has been inline-written to the disk if there is enough data
841      * for more than one leaf block. We iterate by breaking the current level
842      * of the block index, starting with the index of all leaf-level blocks,
843      * into chunks small enough to be written to disk, and generate its parent
844      * level, until we end up with a level small enough to become the root
845      * level.
846      *
847      * If the leaf level is not large enough, there is no inline block index
848      * anymore, so we only write that level of block index to disk as the root
849      * level.
850      *
851      * @param out FSDataOutputStream
852      * @return position at which we entered the root-level index.
853      * @throws IOException
854      */
855     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
856       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
857         throw new IOException("Trying to write a multi-level block index, " +
858             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
859             "last inline chunk.");
860       }
861 
862       // We need to get mid-key metadata before we create intermediate
863       // indexes and overwrite the root chunk.
864       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
865           : null;
866 
867       if (curInlineChunk != null) {
868         while (rootChunk.getRootSize() > maxChunkSize) {
869           rootChunk = writeIntermediateLevel(out, rootChunk);
870           numLevels += 1;
871         }
872       }
873 
874       // write the root level
875       long rootLevelIndexPos = out.getPos();
876 
877       {
878         DataOutput blockStream =
879             blockWriter.startWriting(BlockType.ROOT_INDEX);
880         rootChunk.writeRoot(blockStream);
881         if (midKeyMetadata != null)
882           blockStream.write(midKeyMetadata);
883         blockWriter.writeHeaderAndData(out);
884       }
885 
886       // Add root index block size
887       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
888       totalBlockUncompressedSize +=
889           blockWriter.getUncompressedSizeWithoutHeader();
890 
891       if (LOG.isTraceEnabled()) {
892         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
893           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
894           + " root-level entries, " + totalNumEntries + " total entries, "
895           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
896           " on-disk size, "
897           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
898           " total uncompressed size.");
899       }
900       return rootLevelIndexPos;
901     }
902 
903     /**
904      * Writes the block index data as a single level only. Does not do any
905      * block framing.
906      *
907      * @param out the buffered output stream to write the index to. Typically a
908      *          stream writing into an {@link HFile} block.
909      * @param description a short description of the index being written. Used
910      *          in a log message.
911      * @throws IOException
912      */
913     public void writeSingleLevelIndex(DataOutput out, String description)
914         throws IOException {
915       expectNumLevels(1);
916 
917       if (!singleLevelOnly)
918         throw new IOException("Single-level mode is turned off");
919 
920       if (rootChunk.getNumEntries() > 0)
921         throw new IOException("Root-level entries already added in " +
922             "single-level mode");
923 
924       rootChunk = curInlineChunk;
925       curInlineChunk = new BlockIndexChunk();
926 
927       if (LOG.isTraceEnabled()) {
928         LOG.trace("Wrote a single-level " + description + " index with "
929           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
930           + " bytes");
931       }
932       rootChunk.writeRoot(out);
933     }
934 
935     /**
936      * Split the current level of the block index into intermediate index
937      * blocks of permitted size and write those blocks to disk. Return the next
938      * level of the block index referencing those intermediate-level blocks.
939      *
940      * @param out
941      * @param currentLevel the current level of the block index, such as the a
942      *          chunk referencing all leaf-level index blocks
943      * @return the parent level block index, which becomes the root index after
944      *         a few (usually zero) iterations
945      * @throws IOException
946      */
947     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
948         BlockIndexChunk currentLevel) throws IOException {
949       // Entries referencing intermediate-level blocks we are about to create.
950       BlockIndexChunk parent = new BlockIndexChunk();
951 
952       // The current intermediate-level block index chunk.
953       BlockIndexChunk curChunk = new BlockIndexChunk();
954 
955       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
956         curChunk.add(currentLevel.getBlockKey(i),
957             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
958 
959         if (curChunk.getRootSize() >= maxChunkSize)
960           writeIntermediateBlock(out, parent, curChunk);
961       }
962 
963       if (curChunk.getNumEntries() > 0) {
964         writeIntermediateBlock(out, parent, curChunk);
965       }
966 
967       return parent;
968     }
969 
970     private void writeIntermediateBlock(FSDataOutputStream out,
971         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
972       long beginOffset = out.getPos();
973       DataOutputStream dos = blockWriter.startWriting(
974           BlockType.INTERMEDIATE_INDEX);
975       curChunk.writeNonRoot(dos);
976       byte[] curFirstKey = curChunk.getBlockKey(0);
977       blockWriter.writeHeaderAndData(out);
978 
979       if (cacheConf != null) {
980         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
981         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
982           beginOffset), blockForCaching);
983       }
984 
985       // Add intermediate index block size
986       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
987       totalBlockUncompressedSize +=
988           blockWriter.getUncompressedSizeWithoutHeader();
989 
990       // OFFSET is the beginning offset the chunk of block index entries.
991       // SIZE is the total byte size of the chunk of block index entries
992       // + the secondary index size
993       // FIRST_KEY is the first key in the chunk of block index
994       // entries.
995       parent.add(curFirstKey, beginOffset,
996           blockWriter.getOnDiskSizeWithHeader());
997 
998       // clear current block index chunk
999       curChunk.clear();
1000       curFirstKey = null;
1001     }
1002 
1003     /**
1004      * @return how many block index entries there are in the root level
1005      */
1006     public final int getNumRootEntries() {
1007       return rootChunk.getNumEntries();
1008     }
1009 
1010     /**
1011      * @return the number of levels in this block index.
1012      */
1013     public int getNumLevels() {
1014       return numLevels;
1015     }
1016 
1017     private void expectNumLevels(int expectedNumLevels) {
1018       if (numLevels != expectedNumLevels) {
1019         throw new IllegalStateException("Number of block index levels is "
1020             + numLevels + "but is expected to be " + expectedNumLevels);
1021       }
1022     }
1023 
1024     /**
1025      * Whether there is an inline block ready to be written. In general, we
1026      * write an leaf-level index block as an inline block as soon as its size
1027      * as serialized in the non-root format reaches a certain threshold.
1028      */
1029     @Override
1030     public boolean shouldWriteBlock(boolean closing) {
1031       if (singleLevelOnly) {
1032         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1033       }
1034 
1035       if (curInlineChunk == null) {
1036         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1037             "called with closing=true and then called again?");
1038       }
1039 
1040       if (curInlineChunk.getNumEntries() == 0) {
1041         return false;
1042       }
1043 
1044       // We do have some entries in the current inline chunk.
1045       if (closing) {
1046         if (rootChunk.getNumEntries() == 0) {
1047           // We did not add any leaf-level blocks yet. Instead of creating a
1048           // leaf level with one block, move these entries to the root level.
1049 
1050           expectNumLevels(1);
1051           rootChunk = curInlineChunk;
1052           curInlineChunk = null;  // Disallow adding any more index entries.
1053           return false;
1054         }
1055 
1056         return true;
1057       } else {
1058         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1059       }
1060     }
1061 
1062     /**
1063      * Write out the current inline index block. Inline blocks are non-root
1064      * blocks, so the non-root index format is used.
1065      *
1066      * @param out
1067      */
1068     @Override
1069     public void writeInlineBlock(DataOutput out) throws IOException {
1070       if (singleLevelOnly)
1071         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1072 
1073       // Write the inline block index to the output stream in the non-root
1074       // index block format.
1075       curInlineChunk.writeNonRoot(out);
1076 
1077       // Save the first key of the inline block so that we can add it to the
1078       // parent-level index.
1079       firstKey = curInlineChunk.getBlockKey(0);
1080 
1081       // Start a new inline index block
1082       curInlineChunk.clear();
1083     }
1084 
1085     /**
1086      * Called after an inline block has been written so that we can add an
1087      * entry referring to that block to the parent-level index.
1088      */
1089     @Override
1090     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1091       // Add leaf index block size
1092       totalBlockOnDiskSize += onDiskSize;
1093       totalBlockUncompressedSize += uncompressedSize;
1094 
1095       if (singleLevelOnly)
1096         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1097 
1098       if (firstKey == null) {
1099         throw new IllegalStateException("Trying to add second-level index " +
1100             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1101             "but the first key was not set in writeInlineBlock");
1102       }
1103 
1104       if (rootChunk.getNumEntries() == 0) {
1105         // We are writing the first leaf block, so increase index level.
1106         expectNumLevels(1);
1107         numLevels = 2;
1108       }
1109 
1110       // Add another entry to the second-level index. Include the number of
1111       // entries in all previous leaf-level chunks for mid-key calculation.
1112       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1113       firstKey = null;
1114     }
1115 
1116     @Override
1117     public BlockType getInlineBlockType() {
1118       return BlockType.LEAF_INDEX;
1119     }
1120 
1121     /**
1122      * Add one index entry to the current leaf-level block. When the leaf-level
1123      * block gets large enough, it will be flushed to disk as an inline block.
1124      *
1125      * @param firstKey the first key of the data block
1126      * @param blockOffset the offset of the data block
1127      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1128      *          format version 2), or the uncompressed size of the data block (
1129      *          {@link HFile} format version 1).
1130      */
1131     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1132       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1133       ++totalNumEntries;
1134     }
1135 
1136     /**
1137      * @throws IOException if we happened to write a multi-level index.
1138      */
1139     public void ensureSingleLevel() throws IOException {
1140       if (numLevels > 1) {
1141         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1142             rootChunk.getNumEntries() + " root-level entries, but " +
1143             "this is expected to be a single-level block index.");
1144       }
1145     }
1146 
1147     /**
1148      * @return true if we are using cache-on-write. This is configured by the
1149      *         caller of the constructor by either passing a valid block cache
1150      *         or null.
1151      */
1152     @Override
1153     public boolean getCacheOnWrite() {
1154       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1155     }
1156 
1157     /**
1158      * The total uncompressed size of the root index block, intermediate-level
1159      * index blocks, and leaf-level index blocks.
1160      *
1161      * @return the total uncompressed size of all index blocks
1162      */
1163     public long getTotalUncompressedSize() {
1164       return totalBlockUncompressedSize;
1165     }
1166 
1167   }
1168 
1169   /**
1170    * A single chunk of the block index in the process of writing. The data in
1171    * this chunk can become a leaf-level, intermediate-level, or root index
1172    * block.
1173    */
1174   static class BlockIndexChunk {
1175 
1176     /** First keys of the key range corresponding to each index entry. */
1177     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1178 
1179     /** Block offset in backing stream. */
1180     private final List<Long> blockOffsets = new ArrayList<Long>();
1181 
1182     /** On-disk data sizes of lower-level data or index blocks. */
1183     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1184 
1185     /**
1186      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1187      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1188      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1189      */
1190     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1191 
1192     /**
1193      * The offset of the next entry to be added, relative to the end of the
1194      * "secondary index" in the "non-root" format representation of this index
1195      * chunk. This is the next value to be added to the secondary index.
1196      */
1197     private int curTotalNonRootEntrySize = 0;
1198 
1199     /**
1200      * The accumulated size of this chunk if stored in the root index format.
1201      */
1202     private int curTotalRootSize = 0;
1203 
1204     /**
1205      * The "secondary index" used for binary search over variable-length
1206      * records in a "non-root" format block. These offsets are relative to the
1207      * end of this secondary index.
1208      */
1209     private final List<Integer> secondaryIndexOffsetMarks =
1210         new ArrayList<Integer>();
1211 
1212     /**
1213      * Adds a new entry to this block index chunk.
1214      *
1215      * @param firstKey the first key in the block pointed to by this entry
1216      * @param blockOffset the offset of the next-level block pointed to by this
1217      *          entry
1218      * @param onDiskDataSize the on-disk data of the block pointed to by this
1219      *          entry, including header size
1220      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1221      *          construction, this specifies the current total number of
1222      *          sub-entries in all leaf-level chunks, including the one
1223      *          corresponding to the second-level entry being added.
1224      */
1225     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1226         long curTotalNumSubEntries) {
1227       // Record the offset for the secondary index
1228       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1229       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1230           + firstKey.length;
1231 
1232       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1233           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1234 
1235       blockKeys.add(firstKey);
1236       blockOffsets.add(blockOffset);
1237       onDiskDataSizes.add(onDiskDataSize);
1238 
1239       if (curTotalNumSubEntries != -1) {
1240         numSubEntriesAt.add(curTotalNumSubEntries);
1241 
1242         // Make sure the parallel arrays are in sync.
1243         if (numSubEntriesAt.size() != blockKeys.size()) {
1244           throw new IllegalStateException("Only have key/value count " +
1245               "stats for " + numSubEntriesAt.size() + " block index " +
1246               "entries out of " + blockKeys.size());
1247         }
1248       }
1249     }
1250 
1251     /**
1252      * The same as {@link #add(byte[], long, int, long)} but does not take the
1253      * key/value into account. Used for single-level indexes.
1254      *
1255      * @see {@link #add(byte[], long, int, long)}
1256      */
1257     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1258       add(firstKey, blockOffset, onDiskDataSize, -1);
1259     }
1260 
1261     public void clear() {
1262       blockKeys.clear();
1263       blockOffsets.clear();
1264       onDiskDataSizes.clear();
1265       secondaryIndexOffsetMarks.clear();
1266       numSubEntriesAt.clear();
1267       curTotalNonRootEntrySize = 0;
1268       curTotalRootSize = 0;
1269     }
1270 
1271     /**
1272      * Finds the entry corresponding to the deeper-level index block containing
1273      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1274      * ordering of sub-entries.
1275      *
1276      * <p>
1277      * <i> Implementation note. </i> We are looking for i such that
1278      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1279      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1280      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1281      * sub-entries. i is by definition the insertion point of k in
1282      * numSubEntriesAt.
1283      *
1284      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1285      * @return the 0-based index of the entry corresponding to the given
1286      *         sub-entry
1287      */
1288     public int getEntryBySubEntry(long k) {
1289       // We define mid-key as the key corresponding to k'th sub-entry
1290       // (0-based).
1291 
1292       int i = Collections.binarySearch(numSubEntriesAt, k);
1293 
1294       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1295       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1296       // is in the (i + 1)'th chunk.
1297       if (i >= 0)
1298         return i + 1;
1299 
1300       // Inexact match. Return the insertion point.
1301       return -i - 1;
1302     }
1303 
1304     /**
1305      * Used when writing the root block index of a multi-level block index.
1306      * Serializes additional information allowing to efficiently identify the
1307      * mid-key.
1308      *
1309      * @return a few serialized fields for finding the mid-key
1310      * @throws IOException if could not create metadata for computing mid-key
1311      */
1312     public byte[] getMidKeyMetadata() throws IOException {
1313       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1314           MID_KEY_METADATA_SIZE);
1315       DataOutputStream baosDos = new DataOutputStream(baos);
1316       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1317       if (totalNumSubEntries == 0) {
1318         throw new IOException("No leaf-level entries, mid-key unavailable");
1319       }
1320       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1321       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1322 
1323       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1324       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1325 
1326       long numSubEntriesBefore = midKeyEntry > 0
1327           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1328       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1329       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1330       {
1331         throw new IOException("Could not identify mid-key index within the "
1332             + "leaf-level block containing mid-key: out of range ("
1333             + subEntryWithinEntry + ", numSubEntriesBefore="
1334             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1335             + ")");
1336       }
1337 
1338       baosDos.writeInt((int) subEntryWithinEntry);
1339 
1340       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1341         throw new IOException("Could not write mid-key metadata: size=" +
1342             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1343       }
1344 
1345       // Close just to be good citizens, although this has no effect.
1346       baos.close();
1347 
1348       return baos.toByteArray();
1349     }
1350 
1351     /**
1352      * Writes the block index chunk in the non-root index block format. This
1353      * format contains the number of entries, an index of integer offsets
1354      * for quick binary search on variable-length records, and tuples of
1355      * block offset, on-disk block size, and the first key for each entry.
1356      *
1357      * @param out
1358      * @throws IOException
1359      */
1360     void writeNonRoot(DataOutput out) throws IOException {
1361       // The number of entries in the block.
1362       out.writeInt(blockKeys.size());
1363 
1364       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1365         throw new IOException("Corrupted block index chunk writer: " +
1366             blockKeys.size() + " entries but " +
1367             secondaryIndexOffsetMarks.size() + " secondary index items");
1368       }
1369 
1370       // For each entry, write a "secondary index" of relative offsets to the
1371       // entries from the end of the secondary index. This works, because at
1372       // read time we read the number of entries and know where the secondary
1373       // index ends.
1374       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1375         out.writeInt(currentSecondaryIndex);
1376 
1377       // We include one other element in the secondary index to calculate the
1378       // size of each entry more easily by subtracting secondary index elements.
1379       out.writeInt(curTotalNonRootEntrySize);
1380 
1381       for (int i = 0; i < blockKeys.size(); ++i) {
1382         out.writeLong(blockOffsets.get(i));
1383         out.writeInt(onDiskDataSizes.get(i));
1384         out.write(blockKeys.get(i));
1385       }
1386     }
1387 
1388     /**
1389      * @return the size of this chunk if stored in the non-root index block
1390      *         format
1391      */
1392     int getNonRootSize() {
1393       return Bytes.SIZEOF_INT                          // Number of entries
1394           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1395           + curTotalNonRootEntrySize;                  // All entries
1396     }
1397 
1398     /**
1399      * Writes this chunk into the given output stream in the root block index
1400      * format. This format is similar to the {@link HFile} version 1 block
1401      * index format, except that we store on-disk size of the block instead of
1402      * its uncompressed size.
1403      *
1404      * @param out the data output stream to write the block index to. Typically
1405      *          a stream writing into an {@link HFile} block.
1406      * @throws IOException
1407      */
1408     void writeRoot(DataOutput out) throws IOException {
1409       for (int i = 0; i < blockKeys.size(); ++i) {
1410         out.writeLong(blockOffsets.get(i));
1411         out.writeInt(onDiskDataSizes.get(i));
1412         Bytes.writeByteArray(out, blockKeys.get(i));
1413       }
1414     }
1415 
1416     /**
1417      * @return the size of this chunk if stored in the root index block format
1418      */
1419     int getRootSize() {
1420       return curTotalRootSize;
1421     }
1422 
1423     /**
1424      * @return the number of entries in this block index chunk
1425      */
1426     public int getNumEntries() {
1427       return blockKeys.size();
1428     }
1429 
1430     public byte[] getBlockKey(int i) {
1431       return blockKeys.get(i);
1432     }
1433 
1434     public long getBlockOffset(int i) {
1435       return blockOffsets.get(i);
1436     }
1437 
1438     public int getOnDiskDataSize(int i) {
1439       return onDiskDataSizes.get(i);
1440     }
1441 
1442     public long getCumulativeNumKV(int i) {
1443       if (i < 0)
1444         return 0;
1445       return numSubEntriesAt.get(i);
1446     }
1447 
1448   }
1449 
1450   public static int getMaxChunkSize(Configuration conf) {
1451     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1452   }
1453 }