View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValue.KVComparator;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.io.HeapSize;
44  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
46  import org.apache.hadoop.hbase.util.ByteBufferUtils;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
50  import org.apache.hadoop.io.WritableUtils;
51  import org.apache.hadoop.util.StringUtils;
52  
53  /**
54   * Provides functionality to write ({@link BlockIndexWriter}) and read
55   * ({@link BlockIndexReader}) single-level and multi-level block indexes.
56   *
57   * Examples of how to use the block index writer can be found in
58   * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
59   * to use the reader can be found in {@link HFileReaderV2} and
60   * TestHFileBlockIndex.
61   */
62  @InterfaceAudience.Private
63  public class HFileBlockIndex {
64  
65    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
66  
67    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
68  
69    /**
70     * The maximum size guideline for index blocks (both leaf, intermediate, and
71     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
72     */
73    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
74  
75    /**
76     * The number of bytes stored in each "secondary index" entry in addition to
77     * key bytes in the non-root index block format. The first long is the file
78     * offset of the deeper-level block the entry points to, and the int that
79     * follows is that block's on-disk size without including header.
80     */
81    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
82        + Bytes.SIZEOF_LONG;
83  
84    /**
85     * Error message when trying to use inline block API in single-level mode.
86     */
87    private static final String INLINE_BLOCKS_NOT_ALLOWED =
88        "Inline blocks are not allowed in the single-level-only mode";
89  
90    /**
91     * The size of a meta-data record used for finding the mid-key in a
92     * multi-level index. Consists of the middle leaf-level index block offset
93     * (long), its on-disk size without header included (int), and the mid-key
94     * entry's zero-based index in that leaf index block.
95     */
96    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
97        2 * Bytes.SIZEOF_INT;
98  
99    /**
100    * The reader will always hold the root level index in the memory. Index
101    * blocks at all other levels will be cached in the LRU cache in practice,
102    * although this API does not enforce that.
103    *
104    * All non-root (leaf and intermediate) index blocks contain what we call a
105    * "secondary index": an array of offsets to the entries within the block.
106    * This allows us to do binary search for the entry corresponding to the
107    * given key without having to deserialize the block.
108    */
109   public static class BlockIndexReader implements HeapSize {
110     /** Needed doing lookup on blocks. */
111     private final KVComparator comparator;
112 
113     // Root-level data.
114     private byte[][] blockKeys;
115     private long[] blockOffsets;
116     private int[] blockDataSizes;
117     private int rootCount = 0;
118 
119     // Mid-key metadata.
120     private long midLeafBlockOffset = -1;
121     private int midLeafBlockOnDiskSize = -1;
122     private int midKeyEntry = -1;
123 
124     /** Pre-computed mid-key */
125     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
126 
127     /**
128      * The number of levels in the block index tree. One if there is only root
129      * level, two for root and leaf levels, etc.
130      */
131     private int searchTreeLevel;
132 
133     /** A way to read {@link HFile} blocks at a given offset */
134     private CachingBlockReader cachingBlockReader;
135 
136     public BlockIndexReader(final KVComparator c, final int treeLevel,
137         final CachingBlockReader cachingBlockReader) {
138       this(c, treeLevel);
139       this.cachingBlockReader = cachingBlockReader;
140     }
141 
142     public BlockIndexReader(final KVComparator c, final int treeLevel)
143     {
144       comparator = c;
145       searchTreeLevel = treeLevel;
146     }
147 
148     /**
149      * @return true if the block index is empty.
150      */
151     public boolean isEmpty() {
152       return blockKeys.length == 0;
153     }
154 
155     /**
156      * Verifies that the block index is non-empty and throws an
157      * {@link IllegalStateException} otherwise.
158      */
159     public void ensureNonEmpty() {
160       if (blockKeys.length == 0) {
161         throw new IllegalStateException("Block index is empty or not loaded");
162       }
163     }
164 
165     /**
166      * Return the data block which contains this key. This function will only
167      * be called when the HFile version is larger than 1.
168      *
169      * @param key the key we are looking for
170      * @param currentBlock the current block, to avoid re-reading the same block
171      * @param cacheBlocks
172      * @param pread
173      * @param isCompaction
174      * @param expectedDataBlockEncoding the data block encoding the caller is
175      *          expecting the data block to be in, or null to not perform this
176      *          check and return the block irrespective of the encoding
177      * @return reader a basic way to load blocks
178      * @throws IOException
179      */
180     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
181         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
182         throws IOException {
183       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
184           cacheBlocks,
185           pread, isCompaction, expectedDataBlockEncoding);
186       if (blockWithScanInfo == null) {
187         return null;
188       } else {
189         return blockWithScanInfo.getHFileBlock();
190       }
191     }
192 
193     /**
194      * Return the BlockWithScanInfo which contains the DataBlock with other scan
195      * info such as nextIndexedKey. This function will only be called when the
196      * HFile version is larger than 1.
197      * 
198      * @param key
199      *          the key we are looking for
200      * @param currentBlock
201      *          the current block, to avoid re-reading the same block
202      * @param cacheBlocks
203      * @param pread
204      * @param isCompaction
205      * @param expectedDataBlockEncoding the data block encoding the caller is
206      *          expecting the data block to be in, or null to not perform this
207      *          check and return the block irrespective of the encoding.
208      * @return the BlockWithScanInfo which contains the DataBlock with other
209      *         scan info such as nextIndexedKey.
210      * @throws IOException
211      */
212     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
213         boolean cacheBlocks,
214         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
215         throws IOException {
216       int rootLevelIndex = rootBlockContainingKey(key);
217       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
218         return null;
219       }
220 
221       // the next indexed key
222       byte[] nextIndexedKey = null;
223 
224       // Read the next-level (intermediate or leaf) index block.
225       long currentOffset = blockOffsets[rootLevelIndex];
226       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
227 
228       if (rootLevelIndex < blockKeys.length - 1) {
229         nextIndexedKey = blockKeys[rootLevelIndex + 1];
230       } else {
231         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
232       }
233 
234       int lookupLevel = 1; // How many levels deep we are in our lookup.
235       int index = -1;
236 
237       HFileBlock block;
238       while (true) {
239 
240         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
241         {
242           // Avoid reading the same block again, even with caching turned off.
243           // This is crucial for compaction-type workload which might have
244           // caching turned off. This is like a one-block cache inside the
245           // scanner.
246           block = currentBlock;
247         } else {
248           // Call HFile's caching block reader API. We always cache index
249           // blocks, otherwise we might get terrible performance.
250           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
251           BlockType expectedBlockType;
252           if (lookupLevel < searchTreeLevel - 1) {
253             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
254           } else if (lookupLevel == searchTreeLevel - 1) {
255             expectedBlockType = BlockType.LEAF_INDEX;
256           } else {
257             // this also accounts for ENCODED_DATA
258             expectedBlockType = BlockType.DATA;
259           }
260           block = cachingBlockReader.readBlock(currentOffset,
261           currentOnDiskSize, shouldCache, pread, isCompaction, true,
262               expectedBlockType, expectedDataBlockEncoding);
263         }
264 
265         if (block == null) {
266           throw new IOException("Failed to read block at offset " +
267               currentOffset + ", onDiskSize=" + currentOnDiskSize);
268         }
269 
270         // Found a data block, break the loop and check our level in the tree.
271         if (block.getBlockType().isData()) {
272           break;
273         }
274 
275         // Not a data block. This must be a leaf-level or intermediate-level
276         // index block. We don't allow going deeper than searchTreeLevel.
277         if (++lookupLevel > searchTreeLevel) {
278           throw new IOException("Search Tree Level overflow: lookupLevel="+
279               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
280         }
281 
282         // Locate the entry corresponding to the given key in the non-root
283         // (leaf or intermediate-level) index block.
284         ByteBuffer buffer = block.getBufferWithoutHeader();
285         index = locateNonRootIndexEntry(buffer, key, comparator);
286         if (index == -1) {
287           // This has to be changed
288           // For now change this to key value
289           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
290           throw new IOException("The key "
291               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
292               + " is before the" + " first key of the non-root index block "
293               + block);
294         }
295 
296         currentOffset = buffer.getLong();
297         currentOnDiskSize = buffer.getInt();
298 
299         // Only update next indexed key if there is a next indexed key in the current level
300         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
301         if (tmpNextIndexedKey != null) {
302           nextIndexedKey = tmpNextIndexedKey;
303         }
304       }
305 
306       if (lookupLevel != searchTreeLevel) {
307         throw new IOException("Reached a data block at level " + lookupLevel +
308             " but the number of levels is " + searchTreeLevel);
309       }
310 
311       // set the next indexed key for the current block.
312       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
313       return blockWithScanInfo;
314     }
315 
316     /**
317      * An approximation to the {@link HFile}'s mid-key. Operates on block
318      * boundaries, and does not go inside blocks. In other words, returns the
319      * first key of the middle block of the file.
320      *
321      * @return the first key of the middle block
322      */
323     public byte[] midkey() throws IOException {
324       if (rootCount == 0)
325         throw new IOException("HFile empty");
326 
327       byte[] targetMidKey = this.midKey.get();
328       if (targetMidKey != null) {
329         return targetMidKey;
330       }
331 
332       if (midLeafBlockOffset >= 0) {
333         if (cachingBlockReader == null) {
334           throw new IOException("Have to read the middle leaf block but " +
335               "no block reader available");
336         }
337 
338         // Caching, using pread, assuming this is not a compaction.
339         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
340             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
341             BlockType.LEAF_INDEX, null);
342 
343         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
344         int numDataBlocks = b.getInt();
345         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
346         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
347             keyRelOffset;
348         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
349             + SECONDARY_INDEX_ENTRY_OVERHEAD;
350         targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
351       } else {
352         // The middle of the root-level index.
353         targetMidKey = blockKeys[rootCount / 2];
354       }
355 
356       this.midKey.set(targetMidKey);
357       return targetMidKey;
358     }
359 
360     /**
361      * @param i from 0 to {@link #getRootBlockCount() - 1}
362      */
363     public byte[] getRootBlockKey(int i) {
364       return blockKeys[i];
365     }
366 
367     /**
368      * @param i from 0 to {@link #getRootBlockCount() - 1}
369      */
370     public long getRootBlockOffset(int i) {
371       return blockOffsets[i];
372     }
373 
374     /**
375      * @param i zero-based index of a root-level block
376      * @return the on-disk size of the root-level block for version 2, or the
377      *         uncompressed size for version 1
378      */
379     public int getRootBlockDataSize(int i) {
380       return blockDataSizes[i];
381     }
382 
383     /**
384      * @return the number of root-level blocks in this block index
385      */
386     public int getRootBlockCount() {
387       return rootCount;
388     }
389 
390     /**
391      * Finds the root-level index block containing the given key.
392      *
393      * @param key
394      *          Key to find
395      * @return Offset of block containing <code>key</code> (between 0 and the
396      *         number of blocks - 1) or -1 if this file does not contain the
397      *         request.
398      */
399     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
400       int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
401       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
402       // binarySearch's javadoc.
403 
404       if (pos >= 0) {
405         // This means this is an exact match with an element of blockKeys.
406         assert pos < blockKeys.length;
407         return pos;
408       }
409 
410       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
411       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
412       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
413       // key < blockKeys[0], meaning the file does not contain the given key.
414 
415       int i = -pos - 1;
416       assert 0 <= i && i <= blockKeys.length;
417       return i - 1;
418     }
419 
420     /**
421      * Finds the root-level index block containing the given key.
422      *
423      * @param key
424      *          Key to find
425      */
426     public int rootBlockContainingKey(final Cell key) {
427       int pos = Bytes.binarySearch(blockKeys, key, comparator);
428       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
429       // binarySearch's javadoc.
430 
431       if (pos >= 0) {
432         // This means this is an exact match with an element of blockKeys.
433         assert pos < blockKeys.length;
434         return pos;
435       }
436 
437       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
438       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
439       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
440       // key < blockKeys[0], meaning the file does not contain the given key.
441 
442       int i = -pos - 1;
443       assert 0 <= i && i <= blockKeys.length;
444       return i - 1;
445     }
446 
447     /**
448      * Adds a new entry in the root block index. Only used when reading.
449      *
450      * @param key Last key in the block
451      * @param offset file offset where the block is stored
452      * @param dataSize the uncompressed data size
453      */
454     private void add(final byte[] key, final long offset, final int dataSize) {
455       blockOffsets[rootCount] = offset;
456       blockKeys[rootCount] = key;
457       blockDataSizes[rootCount] = dataSize;
458       rootCount++;
459     }
460 
461     /**
462      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
463      * @param nonRootIndex
464      * @param i the ith position
465      * @return The indexed key at the ith position in the nonRootIndex.
466      */
467     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
468       int numEntries = nonRootIndex.getInt(0);
469       if (i < 0 || i >= numEntries) {
470         return null;
471       }
472 
473       // Entries start after the number of entries and the secondary index.
474       // The secondary index takes numEntries + 1 ints.
475       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
476       // Targetkey's offset relative to the end of secondary index
477       int targetKeyRelOffset = nonRootIndex.getInt(
478           Bytes.SIZEOF_INT * (i + 1));
479 
480       // The offset of the target key in the blockIndex buffer
481       int targetKeyOffset = entriesOffset     // Skip secondary index
482           + targetKeyRelOffset               // Skip all entries until mid
483           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
484 
485       // We subtract the two consecutive secondary index elements, which
486       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
487       // then need to subtract the overhead of offset and onDiskSize.
488       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
489         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
490 
491       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
492     }
493 
494     /**
495      * Performs a binary search over a non-root level index block. Utilizes the
496      * secondary index, which records the offsets of (offset, onDiskSize,
497      * firstKey) tuples of all entries.
498      * 
499      * @param key
500      *          the key we are searching for offsets to individual entries in
501      *          the blockIndex buffer
502      * @param nonRootIndex
503      *          the non-root index block buffer, starting with the secondary
504      *          index. The position is ignored.
505      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
506      *         keys[i + 1], if keys is the array of all keys being searched, or
507      *         -1 otherwise
508      * @throws IOException
509      */
510     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
511         KVComparator comparator) {
512 
513       int numEntries = nonRootIndex.getInt(0);
514       int low = 0;
515       int high = numEntries - 1;
516       int mid = 0;
517 
518       // Entries start after the number of entries and the secondary index.
519       // The secondary index takes numEntries + 1 ints.
520       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
521 
522       // If we imagine that keys[-1] = -Infinity and
523       // keys[numEntries] = Infinity, then we are maintaining an invariant that
524       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
525       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
526       while (low <= high) {
527         mid = (low + high) >>> 1;
528 
529         // Midkey's offset relative to the end of secondary index
530         int midKeyRelOffset = nonRootIndex.getInt(
531             Bytes.SIZEOF_INT * (mid + 1));
532 
533         // The offset of the middle key in the blockIndex buffer
534         int midKeyOffset = entriesOffset       // Skip secondary index
535             + midKeyRelOffset                  // Skip all entries until mid
536             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
537 
538         // We subtract the two consecutive secondary index elements, which
539         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
540         // then need to subtract the overhead of offset and onDiskSize.
541         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
542             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
543 
544         // we have to compare in this order, because the comparator order
545         // has special logic when the 'left side' is a special key.
546         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
547         // done after HBASE-12224 & HBASE-12282
548         nonRootIndexKV.setKey(nonRootIndex.array(),
549             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
550         int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
551 
552         // key lives above the midpoint
553         if (cmp > 0)
554           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
555         // key lives below the midpoint
556         else if (cmp < 0)
557           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
558         else
559           return mid; // exact match
560       }
561 
562       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
563       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
564       // condition, low >= high + 1. Therefore, low = high + 1.
565 
566       if (low != high + 1) {
567         throw new IllegalStateException("Binary search broken: low=" + low
568             + " " + "instead of " + (high + 1));
569       }
570 
571       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
572       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
573       int i = low - 1;
574 
575       // Some extra validation on the result.
576       if (i < -1 || i >= numEntries) {
577         throw new IllegalStateException("Binary search broken: result is " +
578             i + " but expected to be between -1 and (numEntries - 1) = " +
579             (numEntries - 1));
580       }
581 
582       return i;
583     }
584 
585     /**
586      * Search for one key using the secondary index in a non-root block. In case
587      * of success, positions the provided buffer at the entry of interest, where
588      * the file offset and the on-disk-size can be read.
589      *
590      * @param nonRootBlock
591      *          a non-root block without header. Initial position does not
592      *          matter.
593      * @param key
594      *          the byte array containing the key
595      * @return the index position where the given key was found, otherwise
596      *         return -1 in the case the given key is before the first key.
597      *
598      */
599     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
600         KVComparator comparator) {
601       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
602 
603       if (entryIndex != -1) {
604         int numEntries = nonRootBlock.getInt(0);
605 
606         // The end of secondary index and the beginning of entries themselves.
607         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
608 
609         // The offset of the entry we are interested in relative to the end of
610         // the secondary index.
611         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
612 
613         nonRootBlock.position(entriesOffset + entryRelOffset);
614       }
615 
616       return entryIndex;
617     }
618 
619     /**
620      * Read in the root-level index from the given input stream. Must match
621      * what was written into the root level by
622      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
623      * offset that function returned.
624      *
625      * @param in the buffered input stream or wrapped byte input stream
626      * @param numEntries the number of root-level index entries
627      * @throws IOException
628      */
629     public void readRootIndex(DataInput in, final int numEntries)
630         throws IOException {
631       blockOffsets = new long[numEntries];
632       blockKeys = new byte[numEntries][];
633       blockDataSizes = new int[numEntries];
634 
635       // If index size is zero, no index was written.
636       if (numEntries > 0) {
637         for (int i = 0; i < numEntries; ++i) {
638           long offset = in.readLong();
639           int dataSize = in.readInt();
640           byte[] key = Bytes.readByteArray(in);
641           add(key, offset, dataSize);
642         }
643       }
644     }
645     
646     /**
647      * Read in the root-level index from the given input stream. Must match
648      * what was written into the root level by
649      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
650      * offset that function returned.
651      *
652      * @param blk the HFile block
653      * @param numEntries the number of root-level index entries
654      * @return the buffered input stream or wrapped byte input stream
655      * @throws IOException
656      */
657     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
658       DataInputStream in = blk.getByteStream();
659       readRootIndex(in, numEntries);
660       return in;
661     }
662 
663     /**
664      * Read the root-level metadata of a multi-level block index. Based on
665      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
666      * necessary to compute the mid-key in a multi-level index.
667      *
668      * @param blk the HFile block
669      * @param numEntries the number of root-level index entries
670      * @throws IOException
671      */
672     public void readMultiLevelIndexRoot(HFileBlock blk,
673         final int numEntries) throws IOException {
674       DataInputStream in = readRootIndex(blk, numEntries);
675       // after reading the root index the checksum bytes have to
676       // be subtracted to know if the mid key exists.
677       int checkSumBytes = blk.totalChecksumBytes();
678       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
679         // No mid-key metadata available.
680         return;
681       }
682       midLeafBlockOffset = in.readLong();
683       midLeafBlockOnDiskSize = in.readInt();
684       midKeyEntry = in.readInt();
685     }
686 
687     @Override
688     public String toString() {
689       StringBuilder sb = new StringBuilder();
690       sb.append("size=" + rootCount).append("\n");
691       for (int i = 0; i < rootCount; i++) {
692         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
693             .append("\n  offset=").append(blockOffsets[i])
694             .append(", dataSize=" + blockDataSizes[i]).append("\n");
695       }
696       return sb.toString();
697     }
698 
699     @Override
700     public long heapSize() {
701       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
702           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
703 
704       // Mid-key metadata.
705       heapSize += MID_KEY_METADATA_SIZE;
706 
707       // Calculating the size of blockKeys
708       if (blockKeys != null) {
709         // Adding array + references overhead
710         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
711             * ClassSize.REFERENCE);
712 
713         // Adding bytes
714         for (byte[] key : blockKeys) {
715           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
716         }
717       }
718 
719       if (blockOffsets != null) {
720         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
721             * Bytes.SIZEOF_LONG);
722       }
723 
724       if (blockDataSizes != null) {
725         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
726             * Bytes.SIZEOF_INT);
727       }
728 
729       return ClassSize.align(heapSize);
730     }
731 
732   }
733 
734   /**
735    * Writes the block index into the output stream. Generate the tree from
736    * bottom up. The leaf level is written to disk as a sequence of inline
737    * blocks, if it is larger than a certain number of bytes. If the leaf level
738    * is not large enough, we write all entries to the root level instead.
739    *
740    * After all leaf blocks have been written, we end up with an index
741    * referencing the resulting leaf index blocks. If that index is larger than
742    * the allowed root index size, the writer will break it up into
743    * reasonable-size intermediate-level index block chunks write those chunks
744    * out, and create another index referencing those chunks. This will be
745    * repeated until the remaining index is small enough to become the root
746    * index. However, in most practical cases we will only have leaf-level
747    * blocks and the root index, or just the root index.
748    */
749   public static class BlockIndexWriter implements InlineBlockWriter {
750     /**
751      * While the index is being written, this represents the current block
752      * index referencing all leaf blocks, with one exception. If the file is
753      * being closed and there are not enough blocks to complete even a single
754      * leaf block, no leaf blocks get written and this contains the entire
755      * block index. After all levels of the index were written by
756      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
757      * root-level index.
758      */
759     private BlockIndexChunk rootChunk = new BlockIndexChunk();
760 
761     /**
762      * Current leaf-level chunk. New entries referencing data blocks get added
763      * to this chunk until it grows large enough to be written to disk.
764      */
765     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
766 
767     /**
768      * The number of block index levels. This is one if there is only root
769      * level (even empty), two if there a leaf level and root level, and is
770      * higher if there are intermediate levels. This is only final after
771      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
772      * initial value accounts for the root level, and will be increased to two
773      * as soon as we find out there is a leaf-level in
774      * {@link #blockWritten(long, int, int)}.
775      */
776     private int numLevels = 1;
777 
778     private HFileBlock.Writer blockWriter;
779     private byte[] firstKey = null;
780 
781     /**
782      * The total number of leaf-level entries, i.e. entries referenced by
783      * leaf-level blocks. For the data block index this is equal to the number
784      * of data blocks.
785      */
786     private long totalNumEntries;
787 
788     /** Total compressed size of all index blocks. */
789     private long totalBlockOnDiskSize;
790 
791     /** Total uncompressed size of all index blocks. */
792     private long totalBlockUncompressedSize;
793 
794     /** The maximum size guideline of all multi-level index blocks. */
795     private int maxChunkSize;
796 
797     /** Whether we require this block index to always be single-level. */
798     private boolean singleLevelOnly;
799 
800     /** CacheConfig, or null if cache-on-write is disabled */
801     private CacheConfig cacheConf;
802 
803     /** Name to use for computing cache keys */
804     private String nameForCaching;
805 
806     /** Creates a single-level block index writer */
807     public BlockIndexWriter() {
808       this(null, null, null);
809       singleLevelOnly = true;
810     }
811 
812     /**
813      * Creates a multi-level block index writer.
814      *
815      * @param blockWriter the block writer to use to write index blocks
816      * @param cacheConf used to determine when and how a block should be cached-on-write.
817      */
818     public BlockIndexWriter(HFileBlock.Writer blockWriter,
819         CacheConfig cacheConf, String nameForCaching) {
820       if ((cacheConf == null) != (nameForCaching == null)) {
821         throw new IllegalArgumentException("Block cache and file name for " +
822             "caching must be both specified or both null");
823       }
824 
825       this.blockWriter = blockWriter;
826       this.cacheConf = cacheConf;
827       this.nameForCaching = nameForCaching;
828       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
829     }
830 
831     public void setMaxChunkSize(int maxChunkSize) {
832       if (maxChunkSize <= 0) {
833         throw new IllegalArgumentException("Invald maximum index block size");
834       }
835       this.maxChunkSize = maxChunkSize;
836     }
837 
838     /**
839      * Writes the root level and intermediate levels of the block index into
840      * the output stream, generating the tree from bottom up. Assumes that the
841      * leaf level has been inline-written to the disk if there is enough data
842      * for more than one leaf block. We iterate by breaking the current level
843      * of the block index, starting with the index of all leaf-level blocks,
844      * into chunks small enough to be written to disk, and generate its parent
845      * level, until we end up with a level small enough to become the root
846      * level.
847      *
848      * If the leaf level is not large enough, there is no inline block index
849      * anymore, so we only write that level of block index to disk as the root
850      * level.
851      *
852      * @param out FSDataOutputStream
853      * @return position at which we entered the root-level index.
854      * @throws IOException
855      */
856     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
857       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
858         throw new IOException("Trying to write a multi-level block index, " +
859             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
860             "last inline chunk.");
861       }
862 
863       // We need to get mid-key metadata before we create intermediate
864       // indexes and overwrite the root chunk.
865       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
866           : null;
867 
868       if (curInlineChunk != null) {
869         while (rootChunk.getRootSize() > maxChunkSize) {
870           rootChunk = writeIntermediateLevel(out, rootChunk);
871           numLevels += 1;
872         }
873       }
874 
875       // write the root level
876       long rootLevelIndexPos = out.getPos();
877 
878       {
879         DataOutput blockStream =
880             blockWriter.startWriting(BlockType.ROOT_INDEX);
881         rootChunk.writeRoot(blockStream);
882         if (midKeyMetadata != null)
883           blockStream.write(midKeyMetadata);
884         blockWriter.writeHeaderAndData(out);
885       }
886 
887       // Add root index block size
888       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
889       totalBlockUncompressedSize +=
890           blockWriter.getUncompressedSizeWithoutHeader();
891 
892       if (LOG.isTraceEnabled()) {
893         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
894           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
895           + " root-level entries, " + totalNumEntries + " total entries, "
896           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
897           " on-disk size, "
898           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
899           " total uncompressed size.");
900       }
901       return rootLevelIndexPos;
902     }
903 
904     /**
905      * Writes the block index data as a single level only. Does not do any
906      * block framing.
907      *
908      * @param out the buffered output stream to write the index to. Typically a
909      *          stream writing into an {@link HFile} block.
910      * @param description a short description of the index being written. Used
911      *          in a log message.
912      * @throws IOException
913      */
914     public void writeSingleLevelIndex(DataOutput out, String description)
915         throws IOException {
916       expectNumLevels(1);
917 
918       if (!singleLevelOnly)
919         throw new IOException("Single-level mode is turned off");
920 
921       if (rootChunk.getNumEntries() > 0)
922         throw new IOException("Root-level entries already added in " +
923             "single-level mode");
924 
925       rootChunk = curInlineChunk;
926       curInlineChunk = new BlockIndexChunk();
927 
928       if (LOG.isTraceEnabled()) {
929         LOG.trace("Wrote a single-level " + description + " index with "
930           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
931           + " bytes");
932       }
933       rootChunk.writeRoot(out);
934     }
935 
936     /**
937      * Split the current level of the block index into intermediate index
938      * blocks of permitted size and write those blocks to disk. Return the next
939      * level of the block index referencing those intermediate-level blocks.
940      *
941      * @param out
942      * @param currentLevel the current level of the block index, such as the a
943      *          chunk referencing all leaf-level index blocks
944      * @return the parent level block index, which becomes the root index after
945      *         a few (usually zero) iterations
946      * @throws IOException
947      */
948     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
949         BlockIndexChunk currentLevel) throws IOException {
950       // Entries referencing intermediate-level blocks we are about to create.
951       BlockIndexChunk parent = new BlockIndexChunk();
952 
953       // The current intermediate-level block index chunk.
954       BlockIndexChunk curChunk = new BlockIndexChunk();
955 
956       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
957         curChunk.add(currentLevel.getBlockKey(i),
958             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
959 
960         if (curChunk.getRootSize() >= maxChunkSize)
961           writeIntermediateBlock(out, parent, curChunk);
962       }
963 
964       if (curChunk.getNumEntries() > 0) {
965         writeIntermediateBlock(out, parent, curChunk);
966       }
967 
968       return parent;
969     }
970 
971     private void writeIntermediateBlock(FSDataOutputStream out,
972         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
973       long beginOffset = out.getPos();
974       DataOutputStream dos = blockWriter.startWriting(
975           BlockType.INTERMEDIATE_INDEX);
976       curChunk.writeNonRoot(dos);
977       byte[] curFirstKey = curChunk.getBlockKey(0);
978       blockWriter.writeHeaderAndData(out);
979 
980       if (cacheConf != null) {
981         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
982         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
983           beginOffset), blockForCaching);
984       }
985 
986       // Add intermediate index block size
987       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
988       totalBlockUncompressedSize +=
989           blockWriter.getUncompressedSizeWithoutHeader();
990 
991       // OFFSET is the beginning offset the chunk of block index entries.
992       // SIZE is the total byte size of the chunk of block index entries
993       // + the secondary index size
994       // FIRST_KEY is the first key in the chunk of block index
995       // entries.
996       parent.add(curFirstKey, beginOffset,
997           blockWriter.getOnDiskSizeWithHeader());
998 
999       // clear current block index chunk
1000       curChunk.clear();
1001       curFirstKey = null;
1002     }
1003 
1004     /**
1005      * @return how many block index entries there are in the root level
1006      */
1007     public final int getNumRootEntries() {
1008       return rootChunk.getNumEntries();
1009     }
1010 
1011     /**
1012      * @return the number of levels in this block index.
1013      */
1014     public int getNumLevels() {
1015       return numLevels;
1016     }
1017 
1018     private void expectNumLevels(int expectedNumLevels) {
1019       if (numLevels != expectedNumLevels) {
1020         throw new IllegalStateException("Number of block index levels is "
1021             + numLevels + "but is expected to be " + expectedNumLevels);
1022       }
1023     }
1024 
1025     /**
1026      * Whether there is an inline block ready to be written. In general, we
1027      * write an leaf-level index block as an inline block as soon as its size
1028      * as serialized in the non-root format reaches a certain threshold.
1029      */
1030     @Override
1031     public boolean shouldWriteBlock(boolean closing) {
1032       if (singleLevelOnly) {
1033         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1034       }
1035 
1036       if (curInlineChunk == null) {
1037         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1038             "called with closing=true and then called again?");
1039       }
1040 
1041       if (curInlineChunk.getNumEntries() == 0) {
1042         return false;
1043       }
1044 
1045       // We do have some entries in the current inline chunk.
1046       if (closing) {
1047         if (rootChunk.getNumEntries() == 0) {
1048           // We did not add any leaf-level blocks yet. Instead of creating a
1049           // leaf level with one block, move these entries to the root level.
1050 
1051           expectNumLevels(1);
1052           rootChunk = curInlineChunk;
1053           curInlineChunk = null;  // Disallow adding any more index entries.
1054           return false;
1055         }
1056 
1057         return true;
1058       } else {
1059         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1060       }
1061     }
1062 
1063     /**
1064      * Write out the current inline index block. Inline blocks are non-root
1065      * blocks, so the non-root index format is used.
1066      *
1067      * @param out
1068      */
1069     @Override
1070     public void writeInlineBlock(DataOutput out) throws IOException {
1071       if (singleLevelOnly)
1072         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1073 
1074       // Write the inline block index to the output stream in the non-root
1075       // index block format.
1076       curInlineChunk.writeNonRoot(out);
1077 
1078       // Save the first key of the inline block so that we can add it to the
1079       // parent-level index.
1080       firstKey = curInlineChunk.getBlockKey(0);
1081 
1082       // Start a new inline index block
1083       curInlineChunk.clear();
1084     }
1085 
1086     /**
1087      * Called after an inline block has been written so that we can add an
1088      * entry referring to that block to the parent-level index.
1089      */
1090     @Override
1091     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1092       // Add leaf index block size
1093       totalBlockOnDiskSize += onDiskSize;
1094       totalBlockUncompressedSize += uncompressedSize;
1095 
1096       if (singleLevelOnly)
1097         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1098 
1099       if (firstKey == null) {
1100         throw new IllegalStateException("Trying to add second-level index " +
1101             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1102             "but the first key was not set in writeInlineBlock");
1103       }
1104 
1105       if (rootChunk.getNumEntries() == 0) {
1106         // We are writing the first leaf block, so increase index level.
1107         expectNumLevels(1);
1108         numLevels = 2;
1109       }
1110 
1111       // Add another entry to the second-level index. Include the number of
1112       // entries in all previous leaf-level chunks for mid-key calculation.
1113       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1114       firstKey = null;
1115     }
1116 
1117     @Override
1118     public BlockType getInlineBlockType() {
1119       return BlockType.LEAF_INDEX;
1120     }
1121 
1122     /**
1123      * Add one index entry to the current leaf-level block. When the leaf-level
1124      * block gets large enough, it will be flushed to disk as an inline block.
1125      *
1126      * @param firstKey the first key of the data block
1127      * @param blockOffset the offset of the data block
1128      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1129      *          format version 2), or the uncompressed size of the data block (
1130      *          {@link HFile} format version 1).
1131      */
1132     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1133       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1134       ++totalNumEntries;
1135     }
1136 
1137     /**
1138      * @throws IOException if we happened to write a multi-level index.
1139      */
1140     public void ensureSingleLevel() throws IOException {
1141       if (numLevels > 1) {
1142         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1143             rootChunk.getNumEntries() + " root-level entries, but " +
1144             "this is expected to be a single-level block index.");
1145       }
1146     }
1147 
1148     /**
1149      * @return true if we are using cache-on-write. This is configured by the
1150      *         caller of the constructor by either passing a valid block cache
1151      *         or null.
1152      */
1153     @Override
1154     public boolean getCacheOnWrite() {
1155       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1156     }
1157 
1158     /**
1159      * The total uncompressed size of the root index block, intermediate-level
1160      * index blocks, and leaf-level index blocks.
1161      *
1162      * @return the total uncompressed size of all index blocks
1163      */
1164     public long getTotalUncompressedSize() {
1165       return totalBlockUncompressedSize;
1166     }
1167 
1168   }
1169 
1170   /**
1171    * A single chunk of the block index in the process of writing. The data in
1172    * this chunk can become a leaf-level, intermediate-level, or root index
1173    * block.
1174    */
1175   static class BlockIndexChunk {
1176 
1177     /** First keys of the key range corresponding to each index entry. */
1178     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1179 
1180     /** Block offset in backing stream. */
1181     private final List<Long> blockOffsets = new ArrayList<Long>();
1182 
1183     /** On-disk data sizes of lower-level data or index blocks. */
1184     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1185 
1186     /**
1187      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1188      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1189      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1190      */
1191     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1192 
1193     /**
1194      * The offset of the next entry to be added, relative to the end of the
1195      * "secondary index" in the "non-root" format representation of this index
1196      * chunk. This is the next value to be added to the secondary index.
1197      */
1198     private int curTotalNonRootEntrySize = 0;
1199 
1200     /**
1201      * The accumulated size of this chunk if stored in the root index format.
1202      */
1203     private int curTotalRootSize = 0;
1204 
1205     /**
1206      * The "secondary index" used for binary search over variable-length
1207      * records in a "non-root" format block. These offsets are relative to the
1208      * end of this secondary index.
1209      */
1210     private final List<Integer> secondaryIndexOffsetMarks =
1211         new ArrayList<Integer>();
1212 
1213     /**
1214      * Adds a new entry to this block index chunk.
1215      *
1216      * @param firstKey the first key in the block pointed to by this entry
1217      * @param blockOffset the offset of the next-level block pointed to by this
1218      *          entry
1219      * @param onDiskDataSize the on-disk data of the block pointed to by this
1220      *          entry, including header size
1221      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1222      *          construction, this specifies the current total number of
1223      *          sub-entries in all leaf-level chunks, including the one
1224      *          corresponding to the second-level entry being added.
1225      */
1226     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1227         long curTotalNumSubEntries) {
1228       // Record the offset for the secondary index
1229       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1230       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1231           + firstKey.length;
1232 
1233       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1234           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1235 
1236       blockKeys.add(firstKey);
1237       blockOffsets.add(blockOffset);
1238       onDiskDataSizes.add(onDiskDataSize);
1239 
1240       if (curTotalNumSubEntries != -1) {
1241         numSubEntriesAt.add(curTotalNumSubEntries);
1242 
1243         // Make sure the parallel arrays are in sync.
1244         if (numSubEntriesAt.size() != blockKeys.size()) {
1245           throw new IllegalStateException("Only have key/value count " +
1246               "stats for " + numSubEntriesAt.size() + " block index " +
1247               "entries out of " + blockKeys.size());
1248         }
1249       }
1250     }
1251 
1252     /**
1253      * The same as {@link #add(byte[], long, int, long)} but does not take the
1254      * key/value into account. Used for single-level indexes.
1255      *
1256      * @see {@link #add(byte[], long, int, long)}
1257      */
1258     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1259       add(firstKey, blockOffset, onDiskDataSize, -1);
1260     }
1261 
1262     public void clear() {
1263       blockKeys.clear();
1264       blockOffsets.clear();
1265       onDiskDataSizes.clear();
1266       secondaryIndexOffsetMarks.clear();
1267       numSubEntriesAt.clear();
1268       curTotalNonRootEntrySize = 0;
1269       curTotalRootSize = 0;
1270     }
1271 
1272     /**
1273      * Finds the entry corresponding to the deeper-level index block containing
1274      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1275      * ordering of sub-entries.
1276      *
1277      * <p>
1278      * <i> Implementation note. </i> We are looking for i such that
1279      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1280      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1281      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1282      * sub-entries. i is by definition the insertion point of k in
1283      * numSubEntriesAt.
1284      *
1285      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1286      * @return the 0-based index of the entry corresponding to the given
1287      *         sub-entry
1288      */
1289     public int getEntryBySubEntry(long k) {
1290       // We define mid-key as the key corresponding to k'th sub-entry
1291       // (0-based).
1292 
1293       int i = Collections.binarySearch(numSubEntriesAt, k);
1294 
1295       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1296       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1297       // is in the (i + 1)'th chunk.
1298       if (i >= 0)
1299         return i + 1;
1300 
1301       // Inexact match. Return the insertion point.
1302       return -i - 1;
1303     }
1304 
1305     /**
1306      * Used when writing the root block index of a multi-level block index.
1307      * Serializes additional information allowing to efficiently identify the
1308      * mid-key.
1309      *
1310      * @return a few serialized fields for finding the mid-key
1311      * @throws IOException if could not create metadata for computing mid-key
1312      */
1313     public byte[] getMidKeyMetadata() throws IOException {
1314       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1315           MID_KEY_METADATA_SIZE);
1316       DataOutputStream baosDos = new DataOutputStream(baos);
1317       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1318       if (totalNumSubEntries == 0) {
1319         throw new IOException("No leaf-level entries, mid-key unavailable");
1320       }
1321       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1322       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1323 
1324       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1325       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1326 
1327       long numSubEntriesBefore = midKeyEntry > 0
1328           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1329       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1330       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1331       {
1332         throw new IOException("Could not identify mid-key index within the "
1333             + "leaf-level block containing mid-key: out of range ("
1334             + subEntryWithinEntry + ", numSubEntriesBefore="
1335             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1336             + ")");
1337       }
1338 
1339       baosDos.writeInt((int) subEntryWithinEntry);
1340 
1341       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1342         throw new IOException("Could not write mid-key metadata: size=" +
1343             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1344       }
1345 
1346       // Close just to be good citizens, although this has no effect.
1347       baos.close();
1348 
1349       return baos.toByteArray();
1350     }
1351 
1352     /**
1353      * Writes the block index chunk in the non-root index block format. This
1354      * format contains the number of entries, an index of integer offsets
1355      * for quick binary search on variable-length records, and tuples of
1356      * block offset, on-disk block size, and the first key for each entry.
1357      *
1358      * @param out
1359      * @throws IOException
1360      */
1361     void writeNonRoot(DataOutput out) throws IOException {
1362       // The number of entries in the block.
1363       out.writeInt(blockKeys.size());
1364 
1365       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1366         throw new IOException("Corrupted block index chunk writer: " +
1367             blockKeys.size() + " entries but " +
1368             secondaryIndexOffsetMarks.size() + " secondary index items");
1369       }
1370 
1371       // For each entry, write a "secondary index" of relative offsets to the
1372       // entries from the end of the secondary index. This works, because at
1373       // read time we read the number of entries and know where the secondary
1374       // index ends.
1375       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1376         out.writeInt(currentSecondaryIndex);
1377 
1378       // We include one other element in the secondary index to calculate the
1379       // size of each entry more easily by subtracting secondary index elements.
1380       out.writeInt(curTotalNonRootEntrySize);
1381 
1382       for (int i = 0; i < blockKeys.size(); ++i) {
1383         out.writeLong(blockOffsets.get(i));
1384         out.writeInt(onDiskDataSizes.get(i));
1385         out.write(blockKeys.get(i));
1386       }
1387     }
1388 
1389     /**
1390      * @return the size of this chunk if stored in the non-root index block
1391      *         format
1392      */
1393     int getNonRootSize() {
1394       return Bytes.SIZEOF_INT                          // Number of entries
1395           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1396           + curTotalNonRootEntrySize;                  // All entries
1397     }
1398 
1399     /**
1400      * Writes this chunk into the given output stream in the root block index
1401      * format. This format is similar to the {@link HFile} version 1 block
1402      * index format, except that we store on-disk size of the block instead of
1403      * its uncompressed size.
1404      *
1405      * @param out the data output stream to write the block index to. Typically
1406      *          a stream writing into an {@link HFile} block.
1407      * @throws IOException
1408      */
1409     void writeRoot(DataOutput out) throws IOException {
1410       for (int i = 0; i < blockKeys.size(); ++i) {
1411         out.writeLong(blockOffsets.get(i));
1412         out.writeInt(onDiskDataSizes.get(i));
1413         Bytes.writeByteArray(out, blockKeys.get(i));
1414       }
1415     }
1416 
1417     /**
1418      * @return the size of this chunk if stored in the root index block format
1419      */
1420     int getRootSize() {
1421       return curTotalRootSize;
1422     }
1423 
1424     /**
1425      * @return the number of entries in this block index chunk
1426      */
1427     public int getNumEntries() {
1428       return blockKeys.size();
1429     }
1430 
1431     public byte[] getBlockKey(int i) {
1432       return blockKeys.get(i);
1433     }
1434 
1435     public long getBlockOffset(int i) {
1436       return blockOffsets.get(i);
1437     }
1438 
1439     public int getOnDiskDataSize(int i) {
1440       return onDiskDataSizes.get(i);
1441     }
1442 
1443     public long getCumulativeNumKV(int i) {
1444       if (i < 0)
1445         return 0;
1446       return numSubEntriesAt.get(i);
1447     }
1448 
1449   }
1450 
1451   public static int getMaxChunkSize(Configuration conf) {
1452     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1453   }
1454 }