View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellComparator;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.io.HeapSize;
46  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
47  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
48  import org.apache.hadoop.hbase.util.ByteBufferUtils;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.io.WritableUtils;
52  import org.apache.hadoop.util.StringUtils;
53  
54  /**
55   * Provides functionality to write ({@link BlockIndexWriter}) and read
56   * ({@link BlockIndexReader})
57   * single-level and multi-level block indexes.
58   *
59   * Examples of how to use the block index writer can be found in
60   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
61   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
62   *  found in {@link HFileWriterImpl} and
63   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
64   */
65  @InterfaceAudience.Private
66  public class HFileBlockIndex {
67  
68    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
69  
70    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
71  
72    /**
73     * The maximum size guideline for index blocks (both leaf, intermediate, and
74     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
75     */
76    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
77  
78    /**
79     * The number of bytes stored in each "secondary index" entry in addition to
80     * key bytes in the non-root index block format. The first long is the file
81     * offset of the deeper-level block the entry points to, and the int that
82     * follows is that block's on-disk size without including header.
83     */
84    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
85        + Bytes.SIZEOF_LONG;
86  
87    /**
88     * Error message when trying to use inline block API in single-level mode.
89     */
90    private static final String INLINE_BLOCKS_NOT_ALLOWED =
91        "Inline blocks are not allowed in the single-level-only mode";
92  
93    /**
94     * The size of a meta-data record used for finding the mid-key in a
95     * multi-level index. Consists of the middle leaf-level index block offset
96     * (long), its on-disk size without header included (int), and the mid-key
97     * entry's zero-based index in that leaf index block.
98     */
99    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
100       2 * Bytes.SIZEOF_INT;
101 
102   /**
103    * An implementation of the BlockIndexReader that deals with block keys which are plain
104    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
105    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
106    */
107    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
108 
109     private byte[][] blockKeys;
110 
111     public ByteArrayKeyBlockIndexReader(final int treeLevel,
112         final CachingBlockReader cachingBlockReader) {
113       this(treeLevel);
114       this.cachingBlockReader = cachingBlockReader;
115     }
116 
117     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
118       // Can be null for METAINDEX block
119       searchTreeLevel = treeLevel;
120     }
121 
122     protected long calculateHeapSizeForBlockKeys(long heapSize) {
123       // Calculating the size of blockKeys
124       if (blockKeys != null) {
125         heapSize += ClassSize.REFERENCE;
126         // Adding array + references overhead
127         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
128 
129         // Adding bytes
130         for (byte[] key : blockKeys) {
131           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
132         }
133       }
134       return heapSize;
135     }
136 
137     @Override
138     public boolean isEmpty() {
139       return blockKeys.length == 0;
140     }
141 
142     /**
143      * @param i
144      *          from 0 to {@link #getRootBlockCount() - 1}
145      */
146     public byte[] getRootBlockKey(int i) {
147       return blockKeys[i];
148     }
149 
150     @Override
151     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
152         boolean cacheBlocks, boolean pread, boolean isCompaction,
153         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
154       // this would not be needed
155       return null;
156     }
157 
158     @Override
159     public Cell midkey() throws IOException {
160       // Not needed here
161       return null;
162     }
163 
164     protected void initialize(int numEntries) {
165       blockKeys = new byte[numEntries][];
166     }
167 
168     protected void add(final byte[] key, final long offset, final int dataSize) {
169       blockOffsets[rootCount] = offset;
170       blockKeys[rootCount] = key;
171       blockDataSizes[rootCount] = dataSize;
172       rootCount++;
173     }
174 
175     @Override
176     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
177       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
178       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
179       // binarySearch's javadoc.
180 
181       if (pos >= 0) {
182         // This means this is an exact match with an element of blockKeys.
183         assert pos < blockKeys.length;
184         return pos;
185       }
186 
187       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
188       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
189       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
190       // key < blockKeys[0], meaning the file does not contain the given key.
191 
192       int i = -pos - 1;
193       assert 0 <= i && i <= blockKeys.length;
194       return i - 1;
195     }
196 
197     @Override
198     public int rootBlockContainingKey(Cell key) {
199       // Should not be called on this because here it deals only with byte[]
200       throw new UnsupportedOperationException(
201           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
202           "can be searched for");
203     }
204 
205     @Override
206     public String toString() {
207       StringBuilder sb = new StringBuilder();
208       sb.append("size=" + rootCount).append("\n");
209       for (int i = 0; i < rootCount; i++) {
210         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
211             .append("\n  offset=").append(blockOffsets[i])
212             .append(", dataSize=" + blockDataSizes[i]).append("\n");
213       }
214       return sb.toString();
215     }
216 
217   }
218 
219   /**
220    * An implementation of the BlockIndexReader that deals with block keys which are the key
221    * part of a cell like the Data block index or the ROW_COL bloom blocks
222    * This needs a comparator to work with the Cells
223    */
224    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
225 
226     private Cell[] blockKeys;
227     /** Pre-computed mid-key */
228     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
229     /** Needed doing lookup on blocks. */
230     private CellComparator comparator;
231 
232     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
233         final CachingBlockReader cachingBlockReader) {
234       this(c, treeLevel);
235       this.cachingBlockReader = cachingBlockReader;
236     }
237 
238     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
239       // Can be null for METAINDEX block
240       comparator = c;
241       searchTreeLevel = treeLevel;
242     }
243 
244     protected long calculateHeapSizeForBlockKeys(long heapSize) {
245       if (blockKeys != null) {
246         heapSize += ClassSize.REFERENCE;
247         // Adding array + references overhead
248         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
249 
250         // Adding blockKeys
251         for (Cell key : blockKeys) {
252           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
253         }
254       }
255       // Add comparator and the midkey atomicreference
256       heapSize += 2 * ClassSize.REFERENCE;
257       return heapSize;
258     }
259 
260     @Override
261     public boolean isEmpty() {
262       return blockKeys.length == 0;
263     }
264 
265     /**
266      * @param i
267      *          from 0 to {@link #getRootBlockCount() - 1}
268      */
269     public Cell getRootBlockKey(int i) {
270       return blockKeys[i];
271     }
272     @Override
273     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
274         boolean cacheBlocks, boolean pread, boolean isCompaction,
275         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
276       int rootLevelIndex = rootBlockContainingKey(key);
277       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
278         return null;
279       }
280 
281       // the next indexed key
282       Cell nextIndexedKey = null;
283 
284       // Read the next-level (intermediate or leaf) index block.
285       long currentOffset = blockOffsets[rootLevelIndex];
286       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
287 
288       if (rootLevelIndex < blockKeys.length - 1) {
289         nextIndexedKey = blockKeys[rootLevelIndex + 1];
290       } else {
291         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
292       }
293 
294       int lookupLevel = 1; // How many levels deep we are in our lookup.
295       int index = -1;
296 
297       HFileBlock block;
298       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
299       while (true) {
300 
301         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
302         {
303           // Avoid reading the same block again, even with caching turned off.
304           // This is crucial for compaction-type workload which might have
305           // caching turned off. This is like a one-block cache inside the
306           // scanner.
307           block = currentBlock;
308         } else {
309           // Call HFile's caching block reader API. We always cache index
310           // blocks, otherwise we might get terrible performance.
311           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
312           BlockType expectedBlockType;
313           if (lookupLevel < searchTreeLevel - 1) {
314             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
315           } else if (lookupLevel == searchTreeLevel - 1) {
316             expectedBlockType = BlockType.LEAF_INDEX;
317           } else {
318             // this also accounts for ENCODED_DATA
319             expectedBlockType = BlockType.DATA;
320           }
321           block = cachingBlockReader.readBlock(currentOffset,
322           currentOnDiskSize, shouldCache, pread, isCompaction, true,
323               expectedBlockType, expectedDataBlockEncoding);
324         }
325 
326         if (block == null) {
327           throw new IOException("Failed to read block at offset " +
328               currentOffset + ", onDiskSize=" + currentOnDiskSize);
329         }
330 
331         // Found a data block, break the loop and check our level in the tree.
332         if (block.getBlockType().isData()) {
333           break;
334         }
335 
336         // Not a data block. This must be a leaf-level or intermediate-level
337         // index block. We don't allow going deeper than searchTreeLevel.
338         if (++lookupLevel > searchTreeLevel) {
339           throw new IOException("Search Tree Level overflow: lookupLevel="+
340               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
341         }
342 
343         // Locate the entry corresponding to the given key in the non-root
344         // (leaf or intermediate-level) index block.
345         ByteBuffer buffer = block.getBufferWithoutHeader();
346         index = locateNonRootIndexEntry(buffer, key, comparator);
347         if (index == -1) {
348           // This has to be changed
349           // For now change this to key value
350           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
351           throw new IOException("The key "
352               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
353               + " is before the" + " first key of the non-root index block "
354               + block);
355         }
356 
357         currentOffset = buffer.getLong();
358         currentOnDiskSize = buffer.getInt();
359 
360         // Only update next indexed key if there is a next indexed key in the current level
361         byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
362         if (nonRootIndexedKey != null) {
363           tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
364           nextIndexedKey = tmpNextIndexKV;
365         }
366       }
367 
368       if (lookupLevel != searchTreeLevel) {
369         throw new IOException("Reached a data block at level " + lookupLevel +
370             " but the number of levels is " + searchTreeLevel);
371       }
372 
373       // set the next indexed key for the current block.
374       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
375       return blockWithScanInfo;
376     }
377 
378     @Override
379     public Cell midkey() throws IOException {
380       if (rootCount == 0)
381         throw new IOException("HFile empty");
382 
383       Cell targetMidKey = this.midKey.get();
384       if (targetMidKey != null) {
385         return targetMidKey;
386       }
387 
388       if (midLeafBlockOffset >= 0) {
389         if (cachingBlockReader == null) {
390           throw new IOException("Have to read the middle leaf block but " +
391               "no block reader available");
392         }
393 
394         // Caching, using pread, assuming this is not a compaction.
395         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
396             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
397             BlockType.LEAF_INDEX, null);
398 
399         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
400         int numDataBlocks = b.getInt();
401         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
402         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
403             keyRelOffset;
404         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
405             + SECONDARY_INDEX_ENTRY_OVERHEAD;
406         byte[] bytes = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
407         targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
408       } else {
409         // The middle of the root-level index.
410         targetMidKey = blockKeys[rootCount / 2];
411       }
412 
413       this.midKey.set(targetMidKey);
414       return targetMidKey;
415     }
416 
417     protected void initialize(int numEntries) {
418       blockKeys = new Cell[numEntries];
419     }
420 
421     /**
422      * Adds a new entry in the root block index. Only used when reading.
423      *
424      * @param key Last key in the block
425      * @param offset file offset where the block is stored
426      * @param dataSize the uncompressed data size
427      */
428     protected void add(final byte[] key, final long offset, final int dataSize) {
429       blockOffsets[rootCount] = offset;
430       // Create the blockKeys as Cells once when the reader is opened
431       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
432       blockDataSizes[rootCount] = dataSize;
433       rootCount++;
434     }
435 
436     @Override
437     public int rootBlockContainingKey(final byte[] key, int offset, int length,
438         CellComparator comp) {
439       // This should always be called with Cell not with a byte[] key
440       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
441       		"array. Only cell based keys can be searched for");
442     }
443 
444     @Override
445     public int rootBlockContainingKey(Cell key) {
446       // Here the comparator should not be null as this happens for the root-level block
447       int pos = Bytes.binarySearch(blockKeys, key, comparator);
448       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
449       // binarySearch's javadoc.
450 
451       if (pos >= 0) {
452         // This means this is an exact match with an element of blockKeys.
453         assert pos < blockKeys.length;
454         return pos;
455       }
456 
457       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
458       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
459       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
460       // key < blockKeys[0], meaning the file does not contain the given key.
461 
462       int i = -pos - 1;
463       assert 0 <= i && i <= blockKeys.length;
464       return i - 1;
465     }
466 
467     @Override
468     public String toString() {
469       StringBuilder sb = new StringBuilder();
470       sb.append("size=" + rootCount).append("\n");
471       for (int i = 0; i < rootCount; i++) {
472         sb.append("key=").append((blockKeys[i]))
473             .append("\n  offset=").append(blockOffsets[i])
474             .append(", dataSize=" + blockDataSizes[i]).append("\n");
475       }
476       return sb.toString();
477     }
478   }
479    /**
480    * The reader will always hold the root level index in the memory. Index
481    * blocks at all other levels will be cached in the LRU cache in practice,
482    * although this API does not enforce that.
483    *
484    * All non-root (leaf and intermediate) index blocks contain what we call a
485    * "secondary index": an array of offsets to the entries within the block.
486    * This allows us to do binary search for the entry corresponding to the
487    * given key without having to deserialize the block.
488    */
489    static abstract class BlockIndexReader implements HeapSize {
490 
491     protected long[] blockOffsets;
492     protected int[] blockDataSizes;
493     protected int rootCount = 0;
494 
495     // Mid-key metadata.
496     protected long midLeafBlockOffset = -1;
497     protected int midLeafBlockOnDiskSize = -1;
498     protected int midKeyEntry = -1;
499 
500     /**
501      * The number of levels in the block index tree. One if there is only root
502      * level, two for root and leaf levels, etc.
503      */
504     protected int searchTreeLevel;
505 
506     /** A way to read {@link HFile} blocks at a given offset */
507     protected CachingBlockReader cachingBlockReader;
508 
509     /**
510      * @return true if the block index is empty.
511      */
512     public abstract boolean isEmpty();
513 
514     /**
515      * Verifies that the block index is non-empty and throws an
516      * {@link IllegalStateException} otherwise.
517      */
518     public void ensureNonEmpty() {
519       if (isEmpty()) {
520         throw new IllegalStateException("Block index is empty or not loaded");
521       }
522     }
523 
524     /**
525      * Return the data block which contains this key. This function will only
526      * be called when the HFile version is larger than 1.
527      *
528      * @param key the key we are looking for
529      * @param currentBlock the current block, to avoid re-reading the same block
530      * @param cacheBlocks
531      * @param pread
532      * @param isCompaction
533      * @param expectedDataBlockEncoding the data block encoding the caller is
534      *          expecting the data block to be in, or null to not perform this
535      *          check and return the block irrespective of the encoding
536      * @return reader a basic way to load blocks
537      * @throws IOException
538      */
539     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
540         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
541         throws IOException {
542       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
543           cacheBlocks,
544           pread, isCompaction, expectedDataBlockEncoding);
545       if (blockWithScanInfo == null) {
546         return null;
547       } else {
548         return blockWithScanInfo.getHFileBlock();
549       }
550     }
551 
552     /**
553      * Return the BlockWithScanInfo which contains the DataBlock with other scan
554      * info such as nextIndexedKey. This function will only be called when the
555      * HFile version is larger than 1.
556      * 
557      * @param key
558      *          the key we are looking for
559      * @param currentBlock
560      *          the current block, to avoid re-reading the same block
561      * @param cacheBlocks
562      * @param pread
563      * @param isCompaction
564      * @param expectedDataBlockEncoding the data block encoding the caller is
565      *          expecting the data block to be in, or null to not perform this
566      *          check and return the block irrespective of the encoding.
567      * @return the BlockWithScanInfo which contains the DataBlock with other
568      *         scan info such as nextIndexedKey.
569      * @throws IOException
570      */
571     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
572         boolean cacheBlocks,
573         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
574         throws IOException;
575 
576     /**
577      * An approximation to the {@link HFile}'s mid-key. Operates on block
578      * boundaries, and does not go inside blocks. In other words, returns the
579      * first key of the middle block of the file.
580      *
581      * @return the first key of the middle block
582      */
583     public abstract Cell midkey() throws IOException;
584 
585     /**
586      * @param i from 0 to {@link #getRootBlockCount() - 1}
587      */
588     public long getRootBlockOffset(int i) {
589       return blockOffsets[i];
590     }
591 
592     /**
593      * @param i zero-based index of a root-level block
594      * @return the on-disk size of the root-level block for version 2, or the
595      *         uncompressed size for version 1
596      */
597     public int getRootBlockDataSize(int i) {
598       return blockDataSizes[i];
599     }
600 
601     /**
602      * @return the number of root-level blocks in this block index
603      */
604     public int getRootBlockCount() {
605       return rootCount;
606     }
607 
608     /**
609      * Finds the root-level index block containing the given key.
610      * 
611      * @param key
612      *          Key to find
613      * @param comp
614      *          the comparator to be used
615      * @return Offset of block containing <code>key</code> (between 0 and the
616      *         number of blocks - 1) or -1 if this file does not contain the
617      *         request.
618      */
619     // When we want to find the meta index block or bloom block for ROW bloom
620     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
621     // CellComparator.
622     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
623         CellComparator comp);
624 
625     /**
626      * Finds the root-level index block containing the given key.
627      *
628      * @param key
629      *          Key to find
630      * @return Offset of block containing <code>key</code> (between 0 and the
631      *         number of blocks - 1) or -1 if this file does not contain the
632      *         request.
633      */
634     // When we want to find the meta index block or bloom block for ROW bloom
635     // type
636     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
637     // need the CellComparator.
638     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
639       return rootBlockContainingKey(key, offset, length, null);
640     }
641 
642     /**
643      * Finds the root-level index block containing the given key.
644      *
645      * @param key
646      *          Key to find
647      */
648     public abstract int rootBlockContainingKey(final Cell key);
649 
650     /**
651      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
652      * @param nonRootIndex
653      * @param i the ith position
654      * @return The indexed key at the ith position in the nonRootIndex.
655      */
656     protected byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
657       int numEntries = nonRootIndex.getInt(0);
658       if (i < 0 || i >= numEntries) {
659         return null;
660       }
661 
662       // Entries start after the number of entries and the secondary index.
663       // The secondary index takes numEntries + 1 ints.
664       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
665       // Targetkey's offset relative to the end of secondary index
666       int targetKeyRelOffset = nonRootIndex.getInt(
667           Bytes.SIZEOF_INT * (i + 1));
668 
669       // The offset of the target key in the blockIndex buffer
670       int targetKeyOffset = entriesOffset     // Skip secondary index
671           + targetKeyRelOffset               // Skip all entries until mid
672           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
673 
674       // We subtract the two consecutive secondary index elements, which
675       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
676       // then need to subtract the overhead of offset and onDiskSize.
677       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
678         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
679 
680       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
681       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
682     }
683 
684     /**
685      * Performs a binary search over a non-root level index block. Utilizes the
686      * secondary index, which records the offsets of (offset, onDiskSize,
687      * firstKey) tuples of all entries.
688      * 
689      * @param key
690      *          the key we are searching for offsets to individual entries in
691      *          the blockIndex buffer
692      * @param nonRootIndex
693      *          the non-root index block buffer, starting with the secondary
694      *          index. The position is ignored.
695      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
696      *         keys[i + 1], if keys is the array of all keys being searched, or
697      *         -1 otherwise
698      * @throws IOException
699      */
700     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
701         CellComparator comparator) {
702 
703       int numEntries = nonRootIndex.getInt(0);
704       int low = 0;
705       int high = numEntries - 1;
706       int mid = 0;
707 
708       // Entries start after the number of entries and the secondary index.
709       // The secondary index takes numEntries + 1 ints.
710       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
711 
712       // If we imagine that keys[-1] = -Infinity and
713       // keys[numEntries] = Infinity, then we are maintaining an invariant that
714       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
715       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
716       while (low <= high) {
717         mid = (low + high) >>> 1;
718 
719         // Midkey's offset relative to the end of secondary index
720         int midKeyRelOffset = nonRootIndex.getInt(
721             Bytes.SIZEOF_INT * (mid + 1));
722 
723         // The offset of the middle key in the blockIndex buffer
724         int midKeyOffset = entriesOffset       // Skip secondary index
725             + midKeyRelOffset                  // Skip all entries until mid
726             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
727 
728         // We subtract the two consecutive secondary index elements, which
729         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
730         // then need to subtract the overhead of offset and onDiskSize.
731         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
732             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
733 
734         // we have to compare in this order, because the comparator order
735         // has special logic when the 'left side' is a special key.
736         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
737         // done after HBASE-12224 & HBASE-12282
738         // TODO avaoid array call.
739         nonRootIndexKV.setKey(nonRootIndex.array(),
740             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
741         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexKV);
742 
743         // key lives above the midpoint
744         if (cmp > 0)
745           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
746         // key lives below the midpoint
747         else if (cmp < 0)
748           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
749         else
750           return mid; // exact match
751       }
752 
753       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
754       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
755       // condition, low >= high + 1. Therefore, low = high + 1.
756 
757       if (low != high + 1) {
758         throw new IllegalStateException("Binary search broken: low=" + low
759             + " " + "instead of " + (high + 1));
760       }
761 
762       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
763       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
764       int i = low - 1;
765 
766       // Some extra validation on the result.
767       if (i < -1 || i >= numEntries) {
768         throw new IllegalStateException("Binary search broken: result is " +
769             i + " but expected to be between -1 and (numEntries - 1) = " +
770             (numEntries - 1));
771       }
772 
773       return i;
774     }
775 
776     /**
777      * Search for one key using the secondary index in a non-root block. In case
778      * of success, positions the provided buffer at the entry of interest, where
779      * the file offset and the on-disk-size can be read.
780      *
781      * @param nonRootBlock
782      *          a non-root block without header. Initial position does not
783      *          matter.
784      * @param key
785      *          the byte array containing the key
786      * @return the index position where the given key was found, otherwise
787      *         return -1 in the case the given key is before the first key.
788      *
789      */
790     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
791         CellComparator comparator) {
792       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
793 
794       if (entryIndex != -1) {
795         int numEntries = nonRootBlock.getInt(0);
796 
797         // The end of secondary index and the beginning of entries themselves.
798         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
799 
800         // The offset of the entry we are interested in relative to the end of
801         // the secondary index.
802         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
803 
804         nonRootBlock.position(entriesOffset + entryRelOffset);
805       }
806 
807       return entryIndex;
808     }
809 
810     /**
811      * Read in the root-level index from the given input stream. Must match
812      * what was written into the root level by
813      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
814      * offset that function returned.
815      *
816      * @param in the buffered input stream or wrapped byte input stream
817      * @param numEntries the number of root-level index entries
818      * @throws IOException
819      */
820     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
821       blockOffsets = new long[numEntries];
822       initialize(numEntries);
823       blockDataSizes = new int[numEntries];
824 
825       // If index size is zero, no index was written.
826       if (numEntries > 0) {
827         for (int i = 0; i < numEntries; ++i) {
828           long offset = in.readLong();
829           int dataSize = in.readInt();
830           byte[] key = Bytes.readByteArray(in);
831           add(key, offset, dataSize);
832         }
833       }
834     }
835 
836     protected abstract void initialize(int numEntries);
837 
838     protected abstract void add(final byte[] key, final long offset, final int dataSize);
839 
840     /**
841      * Read in the root-level index from the given input stream. Must match
842      * what was written into the root level by
843      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
844      * offset that function returned.
845      *
846      * @param blk the HFile block
847      * @param numEntries the number of root-level index entries
848      * @return the buffered input stream or wrapped byte input stream
849      * @throws IOException
850      */
851     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
852       DataInputStream in = blk.getByteStream();
853       readRootIndex(in, numEntries);
854       return in;
855     }
856 
857     /**
858      * Read the root-level metadata of a multi-level block index. Based on
859      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
860      * necessary to compute the mid-key in a multi-level index.
861      *
862      * @param blk the HFile block
863      * @param numEntries the number of root-level index entries
864      * @throws IOException
865      */
866     public void readMultiLevelIndexRoot(HFileBlock blk,
867         final int numEntries) throws IOException {
868       DataInputStream in = readRootIndex(blk, numEntries);
869       // after reading the root index the checksum bytes have to
870       // be subtracted to know if the mid key exists.
871       int checkSumBytes = blk.totalChecksumBytes();
872       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
873         // No mid-key metadata available.
874         return;
875       }
876       midLeafBlockOffset = in.readLong();
877       midLeafBlockOnDiskSize = in.readInt();
878       midKeyEntry = in.readInt();
879     }
880 
881     @Override
882     public long heapSize() {
883       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
884       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
885           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
886 
887       // Mid-key metadata.
888       heapSize += MID_KEY_METADATA_SIZE;
889 
890       heapSize = calculateHeapSizeForBlockKeys(heapSize);
891 
892       if (blockOffsets != null) {
893         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
894             * Bytes.SIZEOF_LONG);
895       }
896 
897       if (blockDataSizes != null) {
898         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
899             * Bytes.SIZEOF_INT);
900       }
901 
902       return ClassSize.align(heapSize);
903     }
904 
905     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
906   }
907 
908   /**
909    * Writes the block index into the output stream. Generate the tree from
910    * bottom up. The leaf level is written to disk as a sequence of inline
911    * blocks, if it is larger than a certain number of bytes. If the leaf level
912    * is not large enough, we write all entries to the root level instead.
913    *
914    * After all leaf blocks have been written, we end up with an index
915    * referencing the resulting leaf index blocks. If that index is larger than
916    * the allowed root index size, the writer will break it up into
917    * reasonable-size intermediate-level index block chunks write those chunks
918    * out, and create another index referencing those chunks. This will be
919    * repeated until the remaining index is small enough to become the root
920    * index. However, in most practical cases we will only have leaf-level
921    * blocks and the root index, or just the root index.
922    */
923   public static class BlockIndexWriter implements InlineBlockWriter {
924     /**
925      * While the index is being written, this represents the current block
926      * index referencing all leaf blocks, with one exception. If the file is
927      * being closed and there are not enough blocks to complete even a single
928      * leaf block, no leaf blocks get written and this contains the entire
929      * block index. After all levels of the index were written by
930      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
931      * root-level index.
932      */
933     private BlockIndexChunk rootChunk = new BlockIndexChunk();
934 
935     /**
936      * Current leaf-level chunk. New entries referencing data blocks get added
937      * to this chunk until it grows large enough to be written to disk.
938      */
939     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
940 
941     /**
942      * The number of block index levels. This is one if there is only root
943      * level (even empty), two if there a leaf level and root level, and is
944      * higher if there are intermediate levels. This is only final after
945      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
946      * initial value accounts for the root level, and will be increased to two
947      * as soon as we find out there is a leaf-level in
948      * {@link #blockWritten(long, int, int)}.
949      */
950     private int numLevels = 1;
951 
952     private HFileBlock.Writer blockWriter;
953     private byte[] firstKey = null;
954 
955     /**
956      * The total number of leaf-level entries, i.e. entries referenced by
957      * leaf-level blocks. For the data block index this is equal to the number
958      * of data blocks.
959      */
960     private long totalNumEntries;
961 
962     /** Total compressed size of all index blocks. */
963     private long totalBlockOnDiskSize;
964 
965     /** Total uncompressed size of all index blocks. */
966     private long totalBlockUncompressedSize;
967 
968     /** The maximum size guideline of all multi-level index blocks. */
969     private int maxChunkSize;
970 
971     /** Whether we require this block index to always be single-level. */
972     private boolean singleLevelOnly;
973 
974     /** CacheConfig, or null if cache-on-write is disabled */
975     private CacheConfig cacheConf;
976 
977     /** Name to use for computing cache keys */
978     private String nameForCaching;
979 
980     /** Creates a single-level block index writer */
981     public BlockIndexWriter() {
982       this(null, null, null);
983       singleLevelOnly = true;
984     }
985 
986     /**
987      * Creates a multi-level block index writer.
988      *
989      * @param blockWriter the block writer to use to write index blocks
990      * @param cacheConf used to determine when and how a block should be cached-on-write.
991      */
992     public BlockIndexWriter(HFileBlock.Writer blockWriter,
993         CacheConfig cacheConf, String nameForCaching) {
994       if ((cacheConf == null) != (nameForCaching == null)) {
995         throw new IllegalArgumentException("Block cache and file name for " +
996             "caching must be both specified or both null");
997       }
998 
999       this.blockWriter = blockWriter;
1000       this.cacheConf = cacheConf;
1001       this.nameForCaching = nameForCaching;
1002       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1003     }
1004 
1005     public void setMaxChunkSize(int maxChunkSize) {
1006       if (maxChunkSize <= 0) {
1007         throw new IllegalArgumentException("Invald maximum index block size");
1008       }
1009       this.maxChunkSize = maxChunkSize;
1010     }
1011 
1012     /**
1013      * Writes the root level and intermediate levels of the block index into
1014      * the output stream, generating the tree from bottom up. Assumes that the
1015      * leaf level has been inline-written to the disk if there is enough data
1016      * for more than one leaf block. We iterate by breaking the current level
1017      * of the block index, starting with the index of all leaf-level blocks,
1018      * into chunks small enough to be written to disk, and generate its parent
1019      * level, until we end up with a level small enough to become the root
1020      * level.
1021      *
1022      * If the leaf level is not large enough, there is no inline block index
1023      * anymore, so we only write that level of block index to disk as the root
1024      * level.
1025      *
1026      * @param out FSDataOutputStream
1027      * @return position at which we entered the root-level index.
1028      * @throws IOException
1029      */
1030     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1031       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1032         throw new IOException("Trying to write a multi-level block index, " +
1033             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1034             "last inline chunk.");
1035       }
1036 
1037       // We need to get mid-key metadata before we create intermediate
1038       // indexes and overwrite the root chunk.
1039       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1040           : null;
1041 
1042       if (curInlineChunk != null) {
1043         while (rootChunk.getRootSize() > maxChunkSize) {
1044           rootChunk = writeIntermediateLevel(out, rootChunk);
1045           numLevels += 1;
1046         }
1047       }
1048 
1049       // write the root level
1050       long rootLevelIndexPos = out.getPos();
1051 
1052       {
1053         DataOutput blockStream =
1054             blockWriter.startWriting(BlockType.ROOT_INDEX);
1055         rootChunk.writeRoot(blockStream);
1056         if (midKeyMetadata != null)
1057           blockStream.write(midKeyMetadata);
1058         blockWriter.writeHeaderAndData(out);
1059       }
1060 
1061       // Add root index block size
1062       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1063       totalBlockUncompressedSize +=
1064           blockWriter.getUncompressedSizeWithoutHeader();
1065 
1066       if (LOG.isTraceEnabled()) {
1067         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1068           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1069           + " root-level entries, " + totalNumEntries + " total entries, "
1070           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1071           " on-disk size, "
1072           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1073           " total uncompressed size.");
1074       }
1075       return rootLevelIndexPos;
1076     }
1077 
1078     /**
1079      * Writes the block index data as a single level only. Does not do any
1080      * block framing.
1081      *
1082      * @param out the buffered output stream to write the index to. Typically a
1083      *          stream writing into an {@link HFile} block.
1084      * @param description a short description of the index being written. Used
1085      *          in a log message.
1086      * @throws IOException
1087      */
1088     public void writeSingleLevelIndex(DataOutput out, String description)
1089         throws IOException {
1090       expectNumLevels(1);
1091 
1092       if (!singleLevelOnly)
1093         throw new IOException("Single-level mode is turned off");
1094 
1095       if (rootChunk.getNumEntries() > 0)
1096         throw new IOException("Root-level entries already added in " +
1097             "single-level mode");
1098 
1099       rootChunk = curInlineChunk;
1100       curInlineChunk = new BlockIndexChunk();
1101 
1102       if (LOG.isTraceEnabled()) {
1103         LOG.trace("Wrote a single-level " + description + " index with "
1104           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1105           + " bytes");
1106       }
1107       rootChunk.writeRoot(out);
1108     }
1109 
1110     /**
1111      * Split the current level of the block index into intermediate index
1112      * blocks of permitted size and write those blocks to disk. Return the next
1113      * level of the block index referencing those intermediate-level blocks.
1114      *
1115      * @param out
1116      * @param currentLevel the current level of the block index, such as the a
1117      *          chunk referencing all leaf-level index blocks
1118      * @return the parent level block index, which becomes the root index after
1119      *         a few (usually zero) iterations
1120      * @throws IOException
1121      */
1122     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1123         BlockIndexChunk currentLevel) throws IOException {
1124       // Entries referencing intermediate-level blocks we are about to create.
1125       BlockIndexChunk parent = new BlockIndexChunk();
1126 
1127       // The current intermediate-level block index chunk.
1128       BlockIndexChunk curChunk = new BlockIndexChunk();
1129 
1130       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1131         curChunk.add(currentLevel.getBlockKey(i),
1132             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1133 
1134         if (curChunk.getRootSize() >= maxChunkSize)
1135           writeIntermediateBlock(out, parent, curChunk);
1136       }
1137 
1138       if (curChunk.getNumEntries() > 0) {
1139         writeIntermediateBlock(out, parent, curChunk);
1140       }
1141 
1142       return parent;
1143     }
1144 
1145     private void writeIntermediateBlock(FSDataOutputStream out,
1146         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1147       long beginOffset = out.getPos();
1148       DataOutputStream dos = blockWriter.startWriting(
1149           BlockType.INTERMEDIATE_INDEX);
1150       curChunk.writeNonRoot(dos);
1151       byte[] curFirstKey = curChunk.getBlockKey(0);
1152       blockWriter.writeHeaderAndData(out);
1153 
1154       if (cacheConf != null) {
1155         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1156         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1157           beginOffset), blockForCaching);
1158       }
1159 
1160       // Add intermediate index block size
1161       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1162       totalBlockUncompressedSize +=
1163           blockWriter.getUncompressedSizeWithoutHeader();
1164 
1165       // OFFSET is the beginning offset the chunk of block index entries.
1166       // SIZE is the total byte size of the chunk of block index entries
1167       // + the secondary index size
1168       // FIRST_KEY is the first key in the chunk of block index
1169       // entries.
1170       parent.add(curFirstKey, beginOffset,
1171           blockWriter.getOnDiskSizeWithHeader());
1172 
1173       // clear current block index chunk
1174       curChunk.clear();
1175       curFirstKey = null;
1176     }
1177 
1178     /**
1179      * @return how many block index entries there are in the root level
1180      */
1181     public final int getNumRootEntries() {
1182       return rootChunk.getNumEntries();
1183     }
1184 
1185     /**
1186      * @return the number of levels in this block index.
1187      */
1188     public int getNumLevels() {
1189       return numLevels;
1190     }
1191 
1192     private void expectNumLevels(int expectedNumLevels) {
1193       if (numLevels != expectedNumLevels) {
1194         throw new IllegalStateException("Number of block index levels is "
1195             + numLevels + "but is expected to be " + expectedNumLevels);
1196       }
1197     }
1198 
1199     /**
1200      * Whether there is an inline block ready to be written. In general, we
1201      * write an leaf-level index block as an inline block as soon as its size
1202      * as serialized in the non-root format reaches a certain threshold.
1203      */
1204     @Override
1205     public boolean shouldWriteBlock(boolean closing) {
1206       if (singleLevelOnly) {
1207         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1208       }
1209 
1210       if (curInlineChunk == null) {
1211         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1212             "called with closing=true and then called again?");
1213       }
1214 
1215       if (curInlineChunk.getNumEntries() == 0) {
1216         return false;
1217       }
1218 
1219       // We do have some entries in the current inline chunk.
1220       if (closing) {
1221         if (rootChunk.getNumEntries() == 0) {
1222           // We did not add any leaf-level blocks yet. Instead of creating a
1223           // leaf level with one block, move these entries to the root level.
1224 
1225           expectNumLevels(1);
1226           rootChunk = curInlineChunk;
1227           curInlineChunk = null;  // Disallow adding any more index entries.
1228           return false;
1229         }
1230 
1231         return true;
1232       } else {
1233         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1234       }
1235     }
1236 
1237     /**
1238      * Write out the current inline index block. Inline blocks are non-root
1239      * blocks, so the non-root index format is used.
1240      *
1241      * @param out
1242      */
1243     @Override
1244     public void writeInlineBlock(DataOutput out) throws IOException {
1245       if (singleLevelOnly)
1246         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1247 
1248       // Write the inline block index to the output stream in the non-root
1249       // index block format.
1250       curInlineChunk.writeNonRoot(out);
1251 
1252       // Save the first key of the inline block so that we can add it to the
1253       // parent-level index.
1254       firstKey = curInlineChunk.getBlockKey(0);
1255 
1256       // Start a new inline index block
1257       curInlineChunk.clear();
1258     }
1259 
1260     /**
1261      * Called after an inline block has been written so that we can add an
1262      * entry referring to that block to the parent-level index.
1263      */
1264     @Override
1265     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1266       // Add leaf index block size
1267       totalBlockOnDiskSize += onDiskSize;
1268       totalBlockUncompressedSize += uncompressedSize;
1269 
1270       if (singleLevelOnly)
1271         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1272 
1273       if (firstKey == null) {
1274         throw new IllegalStateException("Trying to add second-level index " +
1275             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1276             "but the first key was not set in writeInlineBlock");
1277       }
1278 
1279       if (rootChunk.getNumEntries() == 0) {
1280         // We are writing the first leaf block, so increase index level.
1281         expectNumLevels(1);
1282         numLevels = 2;
1283       }
1284 
1285       // Add another entry to the second-level index. Include the number of
1286       // entries in all previous leaf-level chunks for mid-key calculation.
1287       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1288       firstKey = null;
1289     }
1290 
1291     @Override
1292     public BlockType getInlineBlockType() {
1293       return BlockType.LEAF_INDEX;
1294     }
1295 
1296     /**
1297      * Add one index entry to the current leaf-level block. When the leaf-level
1298      * block gets large enough, it will be flushed to disk as an inline block.
1299      *
1300      * @param firstKey the first key of the data block
1301      * @param blockOffset the offset of the data block
1302      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1303      *          format version 2), or the uncompressed size of the data block (
1304      *          {@link HFile} format version 1).
1305      */
1306     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1307       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1308       ++totalNumEntries;
1309     }
1310 
1311     /**
1312      * @throws IOException if we happened to write a multi-level index.
1313      */
1314     public void ensureSingleLevel() throws IOException {
1315       if (numLevels > 1) {
1316         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1317             rootChunk.getNumEntries() + " root-level entries, but " +
1318             "this is expected to be a single-level block index.");
1319       }
1320     }
1321 
1322     /**
1323      * @return true if we are using cache-on-write. This is configured by the
1324      *         caller of the constructor by either passing a valid block cache
1325      *         or null.
1326      */
1327     @Override
1328     public boolean getCacheOnWrite() {
1329       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1330     }
1331 
1332     /**
1333      * The total uncompressed size of the root index block, intermediate-level
1334      * index blocks, and leaf-level index blocks.
1335      *
1336      * @return the total uncompressed size of all index blocks
1337      */
1338     public long getTotalUncompressedSize() {
1339       return totalBlockUncompressedSize;
1340     }
1341 
1342   }
1343 
1344   /**
1345    * A single chunk of the block index in the process of writing. The data in
1346    * this chunk can become a leaf-level, intermediate-level, or root index
1347    * block.
1348    */
1349   static class BlockIndexChunk {
1350 
1351     /** First keys of the key range corresponding to each index entry. */
1352     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1353 
1354     /** Block offset in backing stream. */
1355     private final List<Long> blockOffsets = new ArrayList<Long>();
1356 
1357     /** On-disk data sizes of lower-level data or index blocks. */
1358     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1359 
1360     /**
1361      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1362      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1363      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1364      */
1365     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1366 
1367     /**
1368      * The offset of the next entry to be added, relative to the end of the
1369      * "secondary index" in the "non-root" format representation of this index
1370      * chunk. This is the next value to be added to the secondary index.
1371      */
1372     private int curTotalNonRootEntrySize = 0;
1373 
1374     /**
1375      * The accumulated size of this chunk if stored in the root index format.
1376      */
1377     private int curTotalRootSize = 0;
1378 
1379     /**
1380      * The "secondary index" used for binary search over variable-length
1381      * records in a "non-root" format block. These offsets are relative to the
1382      * end of this secondary index.
1383      */
1384     private final List<Integer> secondaryIndexOffsetMarks =
1385         new ArrayList<Integer>();
1386 
1387     /**
1388      * Adds a new entry to this block index chunk.
1389      *
1390      * @param firstKey the first key in the block pointed to by this entry
1391      * @param blockOffset the offset of the next-level block pointed to by this
1392      *          entry
1393      * @param onDiskDataSize the on-disk data of the block pointed to by this
1394      *          entry, including header size
1395      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1396      *          construction, this specifies the current total number of
1397      *          sub-entries in all leaf-level chunks, including the one
1398      *          corresponding to the second-level entry being added.
1399      */
1400     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1401         long curTotalNumSubEntries) {
1402       // Record the offset for the secondary index
1403       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1404       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1405           + firstKey.length;
1406 
1407       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1408           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1409 
1410       blockKeys.add(firstKey);
1411       blockOffsets.add(blockOffset);
1412       onDiskDataSizes.add(onDiskDataSize);
1413 
1414       if (curTotalNumSubEntries != -1) {
1415         numSubEntriesAt.add(curTotalNumSubEntries);
1416 
1417         // Make sure the parallel arrays are in sync.
1418         if (numSubEntriesAt.size() != blockKeys.size()) {
1419           throw new IllegalStateException("Only have key/value count " +
1420               "stats for " + numSubEntriesAt.size() + " block index " +
1421               "entries out of " + blockKeys.size());
1422         }
1423       }
1424     }
1425 
1426     /**
1427      * The same as {@link #add(byte[], long, int, long)} but does not take the
1428      * key/value into account. Used for single-level indexes.
1429      *
1430      * @see {@link #add(byte[], long, int, long)}
1431      */
1432     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1433       add(firstKey, blockOffset, onDiskDataSize, -1);
1434     }
1435 
1436     public void clear() {
1437       blockKeys.clear();
1438       blockOffsets.clear();
1439       onDiskDataSizes.clear();
1440       secondaryIndexOffsetMarks.clear();
1441       numSubEntriesAt.clear();
1442       curTotalNonRootEntrySize = 0;
1443       curTotalRootSize = 0;
1444     }
1445 
1446     /**
1447      * Finds the entry corresponding to the deeper-level index block containing
1448      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1449      * ordering of sub-entries.
1450      *
1451      * <p>
1452      * <i> Implementation note. </i> We are looking for i such that
1453      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1454      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1455      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1456      * sub-entries. i is by definition the insertion point of k in
1457      * numSubEntriesAt.
1458      *
1459      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1460      * @return the 0-based index of the entry corresponding to the given
1461      *         sub-entry
1462      */
1463     public int getEntryBySubEntry(long k) {
1464       // We define mid-key as the key corresponding to k'th sub-entry
1465       // (0-based).
1466 
1467       int i = Collections.binarySearch(numSubEntriesAt, k);
1468 
1469       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1470       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1471       // is in the (i + 1)'th chunk.
1472       if (i >= 0)
1473         return i + 1;
1474 
1475       // Inexact match. Return the insertion point.
1476       return -i - 1;
1477     }
1478 
1479     /**
1480      * Used when writing the root block index of a multi-level block index.
1481      * Serializes additional information allowing to efficiently identify the
1482      * mid-key.
1483      *
1484      * @return a few serialized fields for finding the mid-key
1485      * @throws IOException if could not create metadata for computing mid-key
1486      */
1487     public byte[] getMidKeyMetadata() throws IOException {
1488       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1489           MID_KEY_METADATA_SIZE);
1490       DataOutputStream baosDos = new DataOutputStream(baos);
1491       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1492       if (totalNumSubEntries == 0) {
1493         throw new IOException("No leaf-level entries, mid-key unavailable");
1494       }
1495       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1496       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1497 
1498       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1499       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1500 
1501       long numSubEntriesBefore = midKeyEntry > 0
1502           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1503       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1504       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1505       {
1506         throw new IOException("Could not identify mid-key index within the "
1507             + "leaf-level block containing mid-key: out of range ("
1508             + subEntryWithinEntry + ", numSubEntriesBefore="
1509             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1510             + ")");
1511       }
1512 
1513       baosDos.writeInt((int) subEntryWithinEntry);
1514 
1515       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1516         throw new IOException("Could not write mid-key metadata: size=" +
1517             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1518       }
1519 
1520       // Close just to be good citizens, although this has no effect.
1521       baos.close();
1522 
1523       return baos.toByteArray();
1524     }
1525 
1526     /**
1527      * Writes the block index chunk in the non-root index block format. This
1528      * format contains the number of entries, an index of integer offsets
1529      * for quick binary search on variable-length records, and tuples of
1530      * block offset, on-disk block size, and the first key for each entry.
1531      *
1532      * @param out
1533      * @throws IOException
1534      */
1535     void writeNonRoot(DataOutput out) throws IOException {
1536       // The number of entries in the block.
1537       out.writeInt(blockKeys.size());
1538 
1539       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1540         throw new IOException("Corrupted block index chunk writer: " +
1541             blockKeys.size() + " entries but " +
1542             secondaryIndexOffsetMarks.size() + " secondary index items");
1543       }
1544 
1545       // For each entry, write a "secondary index" of relative offsets to the
1546       // entries from the end of the secondary index. This works, because at
1547       // read time we read the number of entries and know where the secondary
1548       // index ends.
1549       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1550         out.writeInt(currentSecondaryIndex);
1551 
1552       // We include one other element in the secondary index to calculate the
1553       // size of each entry more easily by subtracting secondary index elements.
1554       out.writeInt(curTotalNonRootEntrySize);
1555 
1556       for (int i = 0; i < blockKeys.size(); ++i) {
1557         out.writeLong(blockOffsets.get(i));
1558         out.writeInt(onDiskDataSizes.get(i));
1559         out.write(blockKeys.get(i));
1560       }
1561     }
1562 
1563     /**
1564      * @return the size of this chunk if stored in the non-root index block
1565      *         format
1566      */
1567     int getNonRootSize() {
1568       return Bytes.SIZEOF_INT                          // Number of entries
1569           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1570           + curTotalNonRootEntrySize;                  // All entries
1571     }
1572 
1573     /**
1574      * Writes this chunk into the given output stream in the root block index
1575      * format. This format is similar to the {@link HFile} version 1 block
1576      * index format, except that we store on-disk size of the block instead of
1577      * its uncompressed size.
1578      *
1579      * @param out the data output stream to write the block index to. Typically
1580      *          a stream writing into an {@link HFile} block.
1581      * @throws IOException
1582      */
1583     void writeRoot(DataOutput out) throws IOException {
1584       for (int i = 0; i < blockKeys.size(); ++i) {
1585         out.writeLong(blockOffsets.get(i));
1586         out.writeInt(onDiskDataSizes.get(i));
1587         Bytes.writeByteArray(out, blockKeys.get(i));
1588       }
1589     }
1590 
1591     /**
1592      * @return the size of this chunk if stored in the root index block format
1593      */
1594     int getRootSize() {
1595       return curTotalRootSize;
1596     }
1597 
1598     /**
1599      * @return the number of entries in this block index chunk
1600      */
1601     public int getNumEntries() {
1602       return blockKeys.size();
1603     }
1604 
1605     public byte[] getBlockKey(int i) {
1606       return blockKeys.get(i);
1607     }
1608 
1609     public long getBlockOffset(int i) {
1610       return blockOffsets.get(i);
1611     }
1612 
1613     public int getOnDiskDataSize(int i) {
1614       return onDiskDataSizes.get(i);
1615     }
1616 
1617     public long getCumulativeNumKV(int i) {
1618       if (i < 0)
1619         return 0;
1620       return numSubEntriesAt.get(i);
1621     }
1622 
1623   }
1624 
1625   public static int getMaxChunkSize(Configuration conf) {
1626     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1627   }
1628 }