View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellComparator;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.io.HeapSize;
46  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
47  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
48  import org.apache.hadoop.hbase.nio.ByteBuff;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.hbase.util.Pair;
52  import org.apache.hadoop.io.WritableUtils;
53  import org.apache.hadoop.util.StringUtils;
54  
55  /**
56   * Provides functionality to write ({@link BlockIndexWriter}) and read
57   * ({@link BlockIndexReader})
58   * single-level and multi-level block indexes.
59   *
60   * Examples of how to use the block index writer can be found in
61   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
62   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
63   *  found in {@link HFileWriterImpl} and
64   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
65   */
66  @InterfaceAudience.Private
67  public class HFileBlockIndex {
68  
69    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
70  
71    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
72  
73    /**
74     * The maximum size guideline for index blocks (both leaf, intermediate, and
75     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
76     */
77    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
78  
79    /**
80     * The number of bytes stored in each "secondary index" entry in addition to
81     * key bytes in the non-root index block format. The first long is the file
82     * offset of the deeper-level block the entry points to, and the int that
83     * follows is that block's on-disk size without including header.
84     */
85    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
86        + Bytes.SIZEOF_LONG;
87  
88    /**
89     * Error message when trying to use inline block API in single-level mode.
90     */
91    private static final String INLINE_BLOCKS_NOT_ALLOWED =
92        "Inline blocks are not allowed in the single-level-only mode";
93  
94    /**
95     * The size of a meta-data record used for finding the mid-key in a
96     * multi-level index. Consists of the middle leaf-level index block offset
97     * (long), its on-disk size without header included (int), and the mid-key
98     * entry's zero-based index in that leaf index block.
99     */
100   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
101       2 * Bytes.SIZEOF_INT;
102 
103   /**
104    * An implementation of the BlockIndexReader that deals with block keys which are plain
105    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
106    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
107    */
108    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
109 
110     private byte[][] blockKeys;
111 
112     public ByteArrayKeyBlockIndexReader(final int treeLevel,
113         final CachingBlockReader cachingBlockReader) {
114       this(treeLevel);
115       this.cachingBlockReader = cachingBlockReader;
116     }
117 
118     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
119       // Can be null for METAINDEX block
120       searchTreeLevel = treeLevel;
121     }
122 
123     protected long calculateHeapSizeForBlockKeys(long heapSize) {
124       // Calculating the size of blockKeys
125       if (blockKeys != null) {
126         heapSize += ClassSize.REFERENCE;
127         // Adding array + references overhead
128         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
129 
130         // Adding bytes
131         for (byte[] key : blockKeys) {
132           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
133         }
134       }
135       return heapSize;
136     }
137 
138     @Override
139     public boolean isEmpty() {
140       return blockKeys.length == 0;
141     }
142 
143     /**
144      * @param i
145      *          from 0 to {@link #getRootBlockCount() - 1}
146      */
147     public byte[] getRootBlockKey(int i) {
148       return blockKeys[i];
149     }
150 
151     @Override
152     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
153         boolean cacheBlocks, boolean pread, boolean isCompaction,
154         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
155       // this would not be needed
156       return null;
157     }
158 
159     @Override
160     public Cell midkey() throws IOException {
161       // Not needed here
162       return null;
163     }
164 
165     protected void initialize(int numEntries) {
166       blockKeys = new byte[numEntries][];
167     }
168 
169     protected void add(final byte[] key, final long offset, final int dataSize) {
170       blockOffsets[rootCount] = offset;
171       blockKeys[rootCount] = key;
172       blockDataSizes[rootCount] = dataSize;
173       rootCount++;
174     }
175 
176     @Override
177     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
178       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
179       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
180       // binarySearch's javadoc.
181 
182       if (pos >= 0) {
183         // This means this is an exact match with an element of blockKeys.
184         assert pos < blockKeys.length;
185         return pos;
186       }
187 
188       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
189       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
190       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
191       // key < blockKeys[0], meaning the file does not contain the given key.
192 
193       int i = -pos - 1;
194       assert 0 <= i && i <= blockKeys.length;
195       return i - 1;
196     }
197 
198     @Override
199     public int rootBlockContainingKey(Cell key) {
200       // Should not be called on this because here it deals only with byte[]
201       throw new UnsupportedOperationException(
202           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
203           "can be searched for");
204     }
205 
206     @Override
207     public String toString() {
208       StringBuilder sb = new StringBuilder();
209       sb.append("size=" + rootCount).append("\n");
210       for (int i = 0; i < rootCount; i++) {
211         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
212             .append("\n  offset=").append(blockOffsets[i])
213             .append(", dataSize=" + blockDataSizes[i]).append("\n");
214       }
215       return sb.toString();
216     }
217 
218   }
219 
220   /**
221    * An implementation of the BlockIndexReader that deals with block keys which are the key
222    * part of a cell like the Data block index or the ROW_COL bloom blocks
223    * This needs a comparator to work with the Cells
224    */
225    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
226 
227     private Cell[] blockKeys;
228     /** Pre-computed mid-key */
229     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
230     /** Needed doing lookup on blocks. */
231     private CellComparator comparator;
232 
233     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
234         final CachingBlockReader cachingBlockReader) {
235       this(c, treeLevel);
236       this.cachingBlockReader = cachingBlockReader;
237     }
238 
239     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
240       // Can be null for METAINDEX block
241       comparator = c;
242       searchTreeLevel = treeLevel;
243     }
244 
245     protected long calculateHeapSizeForBlockKeys(long heapSize) {
246       if (blockKeys != null) {
247         heapSize += ClassSize.REFERENCE;
248         // Adding array + references overhead
249         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
250 
251         // Adding blockKeys
252         for (Cell key : blockKeys) {
253           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
254         }
255       }
256       // Add comparator and the midkey atomicreference
257       heapSize += 2 * ClassSize.REFERENCE;
258       return heapSize;
259     }
260 
261     @Override
262     public boolean isEmpty() {
263       return blockKeys.length == 0;
264     }
265 
266     /**
267      * @param i
268      *          from 0 to {@link #getRootBlockCount() - 1}
269      */
270     public Cell getRootBlockKey(int i) {
271       return blockKeys[i];
272     }
273     @Override
274     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
275         boolean cacheBlocks, boolean pread, boolean isCompaction,
276         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
277       int rootLevelIndex = rootBlockContainingKey(key);
278       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
279         return null;
280       }
281 
282       // the next indexed key
283       Cell nextIndexedKey = null;
284 
285       // Read the next-level (intermediate or leaf) index block.
286       long currentOffset = blockOffsets[rootLevelIndex];
287       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
288 
289       if (rootLevelIndex < blockKeys.length - 1) {
290         nextIndexedKey = blockKeys[rootLevelIndex + 1];
291       } else {
292         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
293       }
294 
295       int lookupLevel = 1; // How many levels deep we are in our lookup.
296       int index = -1;
297 
298       HFileBlock block = null;
299       boolean dataBlock = false;
300       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
301       while (true) {
302         try {
303           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
304             // Avoid reading the same block again, even with caching turned off.
305             // This is crucial for compaction-type workload which might have
306             // caching turned off. This is like a one-block cache inside the
307             // scanner.
308             block = currentBlock;
309           } else {
310             // Call HFile's caching block reader API. We always cache index
311             // blocks, otherwise we might get terrible performance.
312             boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
313             BlockType expectedBlockType;
314             if (lookupLevel < searchTreeLevel - 1) {
315               expectedBlockType = BlockType.INTERMEDIATE_INDEX;
316             } else if (lookupLevel == searchTreeLevel - 1) {
317               expectedBlockType = BlockType.LEAF_INDEX;
318             } else {
319               // this also accounts for ENCODED_DATA
320               expectedBlockType = BlockType.DATA;
321             }
322             block =
323                 cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
324                   isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
325           }
326 
327           if (block == null) {
328             throw new IOException("Failed to read block at offset " + currentOffset
329                 + ", onDiskSize=" + currentOnDiskSize);
330           }
331 
332           // Found a data block, break the loop and check our level in the tree.
333           if (block.getBlockType().isData()) {
334             dataBlock = true;
335             break;
336           }
337 
338           // Not a data block. This must be a leaf-level or intermediate-level
339           // index block. We don't allow going deeper than searchTreeLevel.
340           if (++lookupLevel > searchTreeLevel) {
341             throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
342                 + ", searchTreeLevel=" + searchTreeLevel);
343           }
344 
345           // Locate the entry corresponding to the given key in the non-root
346           // (leaf or intermediate-level) index block.
347           ByteBuff buffer = block.getBufferWithoutHeader();
348           index = locateNonRootIndexEntry(buffer, key, comparator);
349           if (index == -1) {
350             // This has to be changed
351             // For now change this to key value
352             KeyValue kv = KeyValueUtil.ensureKeyValue(key);
353             throw new IOException("The key "
354                 + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
355                 + " is before the" + " first key of the non-root index block " + block);
356           }
357 
358           currentOffset = buffer.getLong();
359           currentOnDiskSize = buffer.getInt();
360 
361           // Only update next indexed key if there is a next indexed key in the current level
362           byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
363           if (nonRootIndexedKey != null) {
364             tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
365             nextIndexedKey = tmpNextIndexKV;
366           }
367         } finally {
368           if (!dataBlock) {
369             // Return the block immediately if it is not the
370             // data block
371             cachingBlockReader.returnBlock(block);
372           }
373         }
374       }
375 
376       if (lookupLevel != searchTreeLevel) {
377         assert dataBlock == true;
378         // Though we have retrieved a data block we have found an issue
379         // in the retrieved data block. Hence returned the block so that
380         // the ref count can be decremented
381         cachingBlockReader.returnBlock(block);
382         throw new IOException("Reached a data block at level " + lookupLevel +
383             " but the number of levels is " + searchTreeLevel);
384       }
385 
386       // set the next indexed key for the current block.
387       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
388       return blockWithScanInfo;
389     }
390 
391     @Override
392     public Cell midkey() throws IOException {
393       if (rootCount == 0)
394         throw new IOException("HFile empty");
395 
396       Cell targetMidKey = this.midKey.get();
397       if (targetMidKey != null) {
398         return targetMidKey;
399       }
400 
401       if (midLeafBlockOffset >= 0) {
402         if (cachingBlockReader == null) {
403           throw new IOException("Have to read the middle leaf block but " +
404               "no block reader available");
405         }
406 
407         // Caching, using pread, assuming this is not a compaction.
408         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
409             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
410             BlockType.LEAF_INDEX, null);
411         try {
412           ByteBuff b = midLeafBlock.getBufferWithoutHeader();
413           int numDataBlocks = b.getIntAfterPosition(0);
414           int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
415           int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
416           int keyOffset =
417               Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
418                   + SECONDARY_INDEX_ENTRY_OVERHEAD;
419           byte[] bytes = b.toBytes(keyOffset, keyLen);
420           targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
421         } finally {
422           cachingBlockReader.returnBlock(midLeafBlock);
423         }
424       } else {
425         // The middle of the root-level index.
426         targetMidKey = blockKeys[rootCount / 2];
427       }
428 
429       this.midKey.set(targetMidKey);
430       return targetMidKey;
431     }
432 
433     protected void initialize(int numEntries) {
434       blockKeys = new Cell[numEntries];
435     }
436 
437     /**
438      * Adds a new entry in the root block index. Only used when reading.
439      *
440      * @param key Last key in the block
441      * @param offset file offset where the block is stored
442      * @param dataSize the uncompressed data size
443      */
444     protected void add(final byte[] key, final long offset, final int dataSize) {
445       blockOffsets[rootCount] = offset;
446       // Create the blockKeys as Cells once when the reader is opened
447       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
448       blockDataSizes[rootCount] = dataSize;
449       rootCount++;
450     }
451 
452     @Override
453     public int rootBlockContainingKey(final byte[] key, int offset, int length,
454         CellComparator comp) {
455       // This should always be called with Cell not with a byte[] key
456       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
457       		"array. Only cell based keys can be searched for");
458     }
459 
460     @Override
461     public int rootBlockContainingKey(Cell key) {
462       // Here the comparator should not be null as this happens for the root-level block
463       int pos = Bytes.binarySearch(blockKeys, key, comparator);
464       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
465       // binarySearch's javadoc.
466 
467       if (pos >= 0) {
468         // This means this is an exact match with an element of blockKeys.
469         assert pos < blockKeys.length;
470         return pos;
471       }
472 
473       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
474       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
475       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
476       // key < blockKeys[0], meaning the file does not contain the given key.
477 
478       int i = -pos - 1;
479       assert 0 <= i && i <= blockKeys.length;
480       return i - 1;
481     }
482 
483     @Override
484     public String toString() {
485       StringBuilder sb = new StringBuilder();
486       sb.append("size=" + rootCount).append("\n");
487       for (int i = 0; i < rootCount; i++) {
488         sb.append("key=").append((blockKeys[i]))
489             .append("\n  offset=").append(blockOffsets[i])
490             .append(", dataSize=" + blockDataSizes[i]).append("\n");
491       }
492       return sb.toString();
493     }
494   }
495    /**
496    * The reader will always hold the root level index in the memory. Index
497    * blocks at all other levels will be cached in the LRU cache in practice,
498    * although this API does not enforce that.
499    *
500    * All non-root (leaf and intermediate) index blocks contain what we call a
501    * "secondary index": an array of offsets to the entries within the block.
502    * This allows us to do binary search for the entry corresponding to the
503    * given key without having to deserialize the block.
504    */
505    static abstract class BlockIndexReader implements HeapSize {
506 
507     protected long[] blockOffsets;
508     protected int[] blockDataSizes;
509     protected int rootCount = 0;
510 
511     // Mid-key metadata.
512     protected long midLeafBlockOffset = -1;
513     protected int midLeafBlockOnDiskSize = -1;
514     protected int midKeyEntry = -1;
515 
516     /**
517      * The number of levels in the block index tree. One if there is only root
518      * level, two for root and leaf levels, etc.
519      */
520     protected int searchTreeLevel;
521 
522     /** A way to read {@link HFile} blocks at a given offset */
523     protected CachingBlockReader cachingBlockReader;
524 
525     /**
526      * @return true if the block index is empty.
527      */
528     public abstract boolean isEmpty();
529 
530     /**
531      * Verifies that the block index is non-empty and throws an
532      * {@link IllegalStateException} otherwise.
533      */
534     public void ensureNonEmpty() {
535       if (isEmpty()) {
536         throw new IllegalStateException("Block index is empty or not loaded");
537       }
538     }
539 
540     /**
541      * Return the data block which contains this key. This function will only
542      * be called when the HFile version is larger than 1.
543      *
544      * @param key the key we are looking for
545      * @param currentBlock the current block, to avoid re-reading the same block
546      * @param cacheBlocks
547      * @param pread
548      * @param isCompaction
549      * @param expectedDataBlockEncoding the data block encoding the caller is
550      *          expecting the data block to be in, or null to not perform this
551      *          check and return the block irrespective of the encoding
552      * @return reader a basic way to load blocks
553      * @throws IOException
554      */
555     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
556         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
557         throws IOException {
558       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
559           cacheBlocks,
560           pread, isCompaction, expectedDataBlockEncoding);
561       if (blockWithScanInfo == null) {
562         return null;
563       } else {
564         return blockWithScanInfo.getHFileBlock();
565       }
566     }
567 
568     /**
569      * Return the BlockWithScanInfo which contains the DataBlock with other scan
570      * info such as nextIndexedKey. This function will only be called when the
571      * HFile version is larger than 1.
572      * 
573      * @param key
574      *          the key we are looking for
575      * @param currentBlock
576      *          the current block, to avoid re-reading the same block
577      * @param cacheBlocks
578      * @param pread
579      * @param isCompaction
580      * @param expectedDataBlockEncoding the data block encoding the caller is
581      *          expecting the data block to be in, or null to not perform this
582      *          check and return the block irrespective of the encoding.
583      * @return the BlockWithScanInfo which contains the DataBlock with other
584      *         scan info such as nextIndexedKey.
585      * @throws IOException
586      */
587     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
588         boolean cacheBlocks,
589         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
590         throws IOException;
591 
592     /**
593      * An approximation to the {@link HFile}'s mid-key. Operates on block
594      * boundaries, and does not go inside blocks. In other words, returns the
595      * first key of the middle block of the file.
596      *
597      * @return the first key of the middle block
598      */
599     public abstract Cell midkey() throws IOException;
600 
601     /**
602      * @param i from 0 to {@link #getRootBlockCount() - 1}
603      */
604     public long getRootBlockOffset(int i) {
605       return blockOffsets[i];
606     }
607 
608     /**
609      * @param i zero-based index of a root-level block
610      * @return the on-disk size of the root-level block for version 2, or the
611      *         uncompressed size for version 1
612      */
613     public int getRootBlockDataSize(int i) {
614       return blockDataSizes[i];
615     }
616 
617     /**
618      * @return the number of root-level blocks in this block index
619      */
620     public int getRootBlockCount() {
621       return rootCount;
622     }
623 
624     /**
625      * Finds the root-level index block containing the given key.
626      * 
627      * @param key
628      *          Key to find
629      * @param comp
630      *          the comparator to be used
631      * @return Offset of block containing <code>key</code> (between 0 and the
632      *         number of blocks - 1) or -1 if this file does not contain the
633      *         request.
634      */
635     // When we want to find the meta index block or bloom block for ROW bloom
636     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
637     // CellComparator.
638     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
639         CellComparator comp);
640 
641     /**
642      * Finds the root-level index block containing the given key.
643      *
644      * @param key
645      *          Key to find
646      * @return Offset of block containing <code>key</code> (between 0 and the
647      *         number of blocks - 1) or -1 if this file does not contain the
648      *         request.
649      */
650     // When we want to find the meta index block or bloom block for ROW bloom
651     // type
652     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
653     // need the CellComparator.
654     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
655       return rootBlockContainingKey(key, offset, length, null);
656     }
657 
658     /**
659      * Finds the root-level index block containing the given key.
660      *
661      * @param key
662      *          Key to find
663      */
664     public abstract int rootBlockContainingKey(final Cell key);
665 
666     /**
667      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
668      * @param nonRootIndex
669      * @param i the ith position
670      * @return The indexed key at the ith position in the nonRootIndex.
671      */
672     protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
673       int numEntries = nonRootIndex.getInt(0);
674       if (i < 0 || i >= numEntries) {
675         return null;
676       }
677 
678       // Entries start after the number of entries and the secondary index.
679       // The secondary index takes numEntries + 1 ints.
680       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
681       // Targetkey's offset relative to the end of secondary index
682       int targetKeyRelOffset = nonRootIndex.getInt(
683           Bytes.SIZEOF_INT * (i + 1));
684 
685       // The offset of the target key in the blockIndex buffer
686       int targetKeyOffset = entriesOffset     // Skip secondary index
687           + targetKeyRelOffset               // Skip all entries until mid
688           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
689 
690       // We subtract the two consecutive secondary index elements, which
691       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
692       // then need to subtract the overhead of offset and onDiskSize.
693       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
694         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
695 
696       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
697       return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
698     }
699 
700     /**
701      * Performs a binary search over a non-root level index block. Utilizes the
702      * secondary index, which records the offsets of (offset, onDiskSize,
703      * firstKey) tuples of all entries.
704      * 
705      * @param key
706      *          the key we are searching for offsets to individual entries in
707      *          the blockIndex buffer
708      * @param nonRootIndex
709      *          the non-root index block buffer, starting with the secondary
710      *          index. The position is ignored.
711      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
712      *         keys[i + 1], if keys is the array of all keys being searched, or
713      *         -1 otherwise
714      * @throws IOException
715      */
716     static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
717         CellComparator comparator) {
718 
719       int numEntries = nonRootIndex.getIntAfterPosition(0);
720       int low = 0;
721       int high = numEntries - 1;
722       int mid = 0;
723 
724       // Entries start after the number of entries and the secondary index.
725       // The secondary index takes numEntries + 1 ints.
726       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
727 
728       // If we imagine that keys[-1] = -Infinity and
729       // keys[numEntries] = Infinity, then we are maintaining an invariant that
730       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
731       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
732       Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
733       while (low <= high) {
734         mid = (low + high) >>> 1;
735 
736         // Midkey's offset relative to the end of secondary index
737         int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
738 
739         // The offset of the middle key in the blockIndex buffer
740         int midKeyOffset = entriesOffset       // Skip secondary index
741             + midKeyRelOffset                  // Skip all entries until mid
742             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
743 
744         // We subtract the two consecutive secondary index elements, which
745         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
746         // then need to subtract the overhead of offset and onDiskSize.
747         int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
748             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
749 
750         // we have to compare in this order, because the comparator order
751         // has special logic when the 'left side' is a special key.
752         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
753         // done after HBASE-12224 & HBASE-12282
754         // TODO avoid array call.
755         nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
756         nonRootIndexKV.setKey(pair.getFirst().array(),
757             pair.getFirst().arrayOffset() + pair.getSecond(), midLength);
758         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexKV);
759 
760         // key lives above the midpoint
761         if (cmp > 0)
762           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
763         // key lives below the midpoint
764         else if (cmp < 0)
765           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
766         else
767           return mid; // exact match
768       }
769 
770       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
771       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
772       // condition, low >= high + 1. Therefore, low = high + 1.
773 
774       if (low != high + 1) {
775         throw new IllegalStateException("Binary search broken: low=" + low
776             + " " + "instead of " + (high + 1));
777       }
778 
779       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
780       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
781       int i = low - 1;
782 
783       // Some extra validation on the result.
784       if (i < -1 || i >= numEntries) {
785         throw new IllegalStateException("Binary search broken: result is " +
786             i + " but expected to be between -1 and (numEntries - 1) = " +
787             (numEntries - 1));
788       }
789 
790       return i;
791     }
792 
793     /**
794      * Search for one key using the secondary index in a non-root block. In case
795      * of success, positions the provided buffer at the entry of interest, where
796      * the file offset and the on-disk-size can be read.
797      *
798      * @param nonRootBlock
799      *          a non-root block without header. Initial position does not
800      *          matter.
801      * @param key
802      *          the byte array containing the key
803      * @return the index position where the given key was found, otherwise
804      *         return -1 in the case the given key is before the first key.
805      *
806      */
807     static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
808         CellComparator comparator) {
809       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
810 
811       if (entryIndex != -1) {
812         int numEntries = nonRootBlock.getIntAfterPosition(0);
813 
814         // The end of secondary index and the beginning of entries themselves.
815         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
816 
817         // The offset of the entry we are interested in relative to the end of
818         // the secondary index.
819         int entryRelOffset = nonRootBlock
820             .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
821 
822         nonRootBlock.position(entriesOffset + entryRelOffset);
823       }
824 
825       return entryIndex;
826     }
827 
828     /**
829      * Read in the root-level index from the given input stream. Must match
830      * what was written into the root level by
831      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
832      * offset that function returned.
833      *
834      * @param in the buffered input stream or wrapped byte input stream
835      * @param numEntries the number of root-level index entries
836      * @throws IOException
837      */
838     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
839       blockOffsets = new long[numEntries];
840       initialize(numEntries);
841       blockDataSizes = new int[numEntries];
842 
843       // If index size is zero, no index was written.
844       if (numEntries > 0) {
845         for (int i = 0; i < numEntries; ++i) {
846           long offset = in.readLong();
847           int dataSize = in.readInt();
848           byte[] key = Bytes.readByteArray(in);
849           add(key, offset, dataSize);
850         }
851       }
852     }
853 
854     protected abstract void initialize(int numEntries);
855 
856     protected abstract void add(final byte[] key, final long offset, final int dataSize);
857 
858     /**
859      * Read in the root-level index from the given input stream. Must match
860      * what was written into the root level by
861      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
862      * offset that function returned.
863      *
864      * @param blk the HFile block
865      * @param numEntries the number of root-level index entries
866      * @return the buffered input stream or wrapped byte input stream
867      * @throws IOException
868      */
869     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
870       DataInputStream in = blk.getByteStream();
871       readRootIndex(in, numEntries);
872       return in;
873     }
874 
875     /**
876      * Read the root-level metadata of a multi-level block index. Based on
877      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
878      * necessary to compute the mid-key in a multi-level index.
879      *
880      * @param blk the HFile block
881      * @param numEntries the number of root-level index entries
882      * @throws IOException
883      */
884     public void readMultiLevelIndexRoot(HFileBlock blk,
885         final int numEntries) throws IOException {
886       DataInputStream in = readRootIndex(blk, numEntries);
887       // after reading the root index the checksum bytes have to
888       // be subtracted to know if the mid key exists.
889       int checkSumBytes = blk.totalChecksumBytes();
890       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
891         // No mid-key metadata available.
892         return;
893       }
894       midLeafBlockOffset = in.readLong();
895       midLeafBlockOnDiskSize = in.readInt();
896       midKeyEntry = in.readInt();
897     }
898 
899     @Override
900     public long heapSize() {
901       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
902       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
903           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
904 
905       // Mid-key metadata.
906       heapSize += MID_KEY_METADATA_SIZE;
907 
908       heapSize = calculateHeapSizeForBlockKeys(heapSize);
909 
910       if (blockOffsets != null) {
911         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
912             * Bytes.SIZEOF_LONG);
913       }
914 
915       if (blockDataSizes != null) {
916         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
917             * Bytes.SIZEOF_INT);
918       }
919 
920       return ClassSize.align(heapSize);
921     }
922 
923     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
924   }
925 
926   /**
927    * Writes the block index into the output stream. Generate the tree from
928    * bottom up. The leaf level is written to disk as a sequence of inline
929    * blocks, if it is larger than a certain number of bytes. If the leaf level
930    * is not large enough, we write all entries to the root level instead.
931    *
932    * After all leaf blocks have been written, we end up with an index
933    * referencing the resulting leaf index blocks. If that index is larger than
934    * the allowed root index size, the writer will break it up into
935    * reasonable-size intermediate-level index block chunks write those chunks
936    * out, and create another index referencing those chunks. This will be
937    * repeated until the remaining index is small enough to become the root
938    * index. However, in most practical cases we will only have leaf-level
939    * blocks and the root index, or just the root index.
940    */
941   public static class BlockIndexWriter implements InlineBlockWriter {
942     /**
943      * While the index is being written, this represents the current block
944      * index referencing all leaf blocks, with one exception. If the file is
945      * being closed and there are not enough blocks to complete even a single
946      * leaf block, no leaf blocks get written and this contains the entire
947      * block index. After all levels of the index were written by
948      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
949      * root-level index.
950      */
951     private BlockIndexChunk rootChunk = new BlockIndexChunk();
952 
953     /**
954      * Current leaf-level chunk. New entries referencing data blocks get added
955      * to this chunk until it grows large enough to be written to disk.
956      */
957     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
958 
959     /**
960      * The number of block index levels. This is one if there is only root
961      * level (even empty), two if there a leaf level and root level, and is
962      * higher if there are intermediate levels. This is only final after
963      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
964      * initial value accounts for the root level, and will be increased to two
965      * as soon as we find out there is a leaf-level in
966      * {@link #blockWritten(long, int, int)}.
967      */
968     private int numLevels = 1;
969 
970     private HFileBlock.Writer blockWriter;
971     private byte[] firstKey = null;
972 
973     /**
974      * The total number of leaf-level entries, i.e. entries referenced by
975      * leaf-level blocks. For the data block index this is equal to the number
976      * of data blocks.
977      */
978     private long totalNumEntries;
979 
980     /** Total compressed size of all index blocks. */
981     private long totalBlockOnDiskSize;
982 
983     /** Total uncompressed size of all index blocks. */
984     private long totalBlockUncompressedSize;
985 
986     /** The maximum size guideline of all multi-level index blocks. */
987     private int maxChunkSize;
988 
989     /** Whether we require this block index to always be single-level. */
990     private boolean singleLevelOnly;
991 
992     /** CacheConfig, or null if cache-on-write is disabled */
993     private CacheConfig cacheConf;
994 
995     /** Name to use for computing cache keys */
996     private String nameForCaching;
997 
998     /** Creates a single-level block index writer */
999     public BlockIndexWriter() {
1000       this(null, null, null);
1001       singleLevelOnly = true;
1002     }
1003 
1004     /**
1005      * Creates a multi-level block index writer.
1006      *
1007      * @param blockWriter the block writer to use to write index blocks
1008      * @param cacheConf used to determine when and how a block should be cached-on-write.
1009      */
1010     public BlockIndexWriter(HFileBlock.Writer blockWriter,
1011         CacheConfig cacheConf, String nameForCaching) {
1012       if ((cacheConf == null) != (nameForCaching == null)) {
1013         throw new IllegalArgumentException("Block cache and file name for " +
1014             "caching must be both specified or both null");
1015       }
1016 
1017       this.blockWriter = blockWriter;
1018       this.cacheConf = cacheConf;
1019       this.nameForCaching = nameForCaching;
1020       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1021     }
1022 
1023     public void setMaxChunkSize(int maxChunkSize) {
1024       if (maxChunkSize <= 0) {
1025         throw new IllegalArgumentException("Invald maximum index block size");
1026       }
1027       this.maxChunkSize = maxChunkSize;
1028     }
1029 
1030     /**
1031      * Writes the root level and intermediate levels of the block index into
1032      * the output stream, generating the tree from bottom up. Assumes that the
1033      * leaf level has been inline-written to the disk if there is enough data
1034      * for more than one leaf block. We iterate by breaking the current level
1035      * of the block index, starting with the index of all leaf-level blocks,
1036      * into chunks small enough to be written to disk, and generate its parent
1037      * level, until we end up with a level small enough to become the root
1038      * level.
1039      *
1040      * If the leaf level is not large enough, there is no inline block index
1041      * anymore, so we only write that level of block index to disk as the root
1042      * level.
1043      *
1044      * @param out FSDataOutputStream
1045      * @return position at which we entered the root-level index.
1046      * @throws IOException
1047      */
1048     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1049       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1050         throw new IOException("Trying to write a multi-level block index, " +
1051             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1052             "last inline chunk.");
1053       }
1054 
1055       // We need to get mid-key metadata before we create intermediate
1056       // indexes and overwrite the root chunk.
1057       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1058           : null;
1059 
1060       if (curInlineChunk != null) {
1061         while (rootChunk.getRootSize() > maxChunkSize) {
1062           rootChunk = writeIntermediateLevel(out, rootChunk);
1063           numLevels += 1;
1064         }
1065       }
1066 
1067       // write the root level
1068       long rootLevelIndexPos = out.getPos();
1069 
1070       {
1071         DataOutput blockStream =
1072             blockWriter.startWriting(BlockType.ROOT_INDEX);
1073         rootChunk.writeRoot(blockStream);
1074         if (midKeyMetadata != null)
1075           blockStream.write(midKeyMetadata);
1076         blockWriter.writeHeaderAndData(out);
1077       }
1078 
1079       // Add root index block size
1080       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1081       totalBlockUncompressedSize +=
1082           blockWriter.getUncompressedSizeWithoutHeader();
1083 
1084       if (LOG.isTraceEnabled()) {
1085         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1086           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1087           + " root-level entries, " + totalNumEntries + " total entries, "
1088           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1089           " on-disk size, "
1090           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1091           " total uncompressed size.");
1092       }
1093       return rootLevelIndexPos;
1094     }
1095 
1096     /**
1097      * Writes the block index data as a single level only. Does not do any
1098      * block framing.
1099      *
1100      * @param out the buffered output stream to write the index to. Typically a
1101      *          stream writing into an {@link HFile} block.
1102      * @param description a short description of the index being written. Used
1103      *          in a log message.
1104      * @throws IOException
1105      */
1106     public void writeSingleLevelIndex(DataOutput out, String description)
1107         throws IOException {
1108       expectNumLevels(1);
1109 
1110       if (!singleLevelOnly)
1111         throw new IOException("Single-level mode is turned off");
1112 
1113       if (rootChunk.getNumEntries() > 0)
1114         throw new IOException("Root-level entries already added in " +
1115             "single-level mode");
1116 
1117       rootChunk = curInlineChunk;
1118       curInlineChunk = new BlockIndexChunk();
1119 
1120       if (LOG.isTraceEnabled()) {
1121         LOG.trace("Wrote a single-level " + description + " index with "
1122           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1123           + " bytes");
1124       }
1125       rootChunk.writeRoot(out);
1126     }
1127 
1128     /**
1129      * Split the current level of the block index into intermediate index
1130      * blocks of permitted size and write those blocks to disk. Return the next
1131      * level of the block index referencing those intermediate-level blocks.
1132      *
1133      * @param out
1134      * @param currentLevel the current level of the block index, such as the a
1135      *          chunk referencing all leaf-level index blocks
1136      * @return the parent level block index, which becomes the root index after
1137      *         a few (usually zero) iterations
1138      * @throws IOException
1139      */
1140     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1141         BlockIndexChunk currentLevel) throws IOException {
1142       // Entries referencing intermediate-level blocks we are about to create.
1143       BlockIndexChunk parent = new BlockIndexChunk();
1144 
1145       // The current intermediate-level block index chunk.
1146       BlockIndexChunk curChunk = new BlockIndexChunk();
1147 
1148       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1149         curChunk.add(currentLevel.getBlockKey(i),
1150             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1151 
1152         if (curChunk.getRootSize() >= maxChunkSize)
1153           writeIntermediateBlock(out, parent, curChunk);
1154       }
1155 
1156       if (curChunk.getNumEntries() > 0) {
1157         writeIntermediateBlock(out, parent, curChunk);
1158       }
1159 
1160       return parent;
1161     }
1162 
1163     private void writeIntermediateBlock(FSDataOutputStream out,
1164         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1165       long beginOffset = out.getPos();
1166       DataOutputStream dos = blockWriter.startWriting(
1167           BlockType.INTERMEDIATE_INDEX);
1168       curChunk.writeNonRoot(dos);
1169       byte[] curFirstKey = curChunk.getBlockKey(0);
1170       blockWriter.writeHeaderAndData(out);
1171 
1172       if (cacheConf != null) {
1173         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1174         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1175           beginOffset), blockForCaching);
1176       }
1177 
1178       // Add intermediate index block size
1179       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1180       totalBlockUncompressedSize +=
1181           blockWriter.getUncompressedSizeWithoutHeader();
1182 
1183       // OFFSET is the beginning offset the chunk of block index entries.
1184       // SIZE is the total byte size of the chunk of block index entries
1185       // + the secondary index size
1186       // FIRST_KEY is the first key in the chunk of block index
1187       // entries.
1188       parent.add(curFirstKey, beginOffset,
1189           blockWriter.getOnDiskSizeWithHeader());
1190 
1191       // clear current block index chunk
1192       curChunk.clear();
1193       curFirstKey = null;
1194     }
1195 
1196     /**
1197      * @return how many block index entries there are in the root level
1198      */
1199     public final int getNumRootEntries() {
1200       return rootChunk.getNumEntries();
1201     }
1202 
1203     /**
1204      * @return the number of levels in this block index.
1205      */
1206     public int getNumLevels() {
1207       return numLevels;
1208     }
1209 
1210     private void expectNumLevels(int expectedNumLevels) {
1211       if (numLevels != expectedNumLevels) {
1212         throw new IllegalStateException("Number of block index levels is "
1213             + numLevels + "but is expected to be " + expectedNumLevels);
1214       }
1215     }
1216 
1217     /**
1218      * Whether there is an inline block ready to be written. In general, we
1219      * write an leaf-level index block as an inline block as soon as its size
1220      * as serialized in the non-root format reaches a certain threshold.
1221      */
1222     @Override
1223     public boolean shouldWriteBlock(boolean closing) {
1224       if (singleLevelOnly) {
1225         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1226       }
1227 
1228       if (curInlineChunk == null) {
1229         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1230             "called with closing=true and then called again?");
1231       }
1232 
1233       if (curInlineChunk.getNumEntries() == 0) {
1234         return false;
1235       }
1236 
1237       // We do have some entries in the current inline chunk.
1238       if (closing) {
1239         if (rootChunk.getNumEntries() == 0) {
1240           // We did not add any leaf-level blocks yet. Instead of creating a
1241           // leaf level with one block, move these entries to the root level.
1242 
1243           expectNumLevels(1);
1244           rootChunk = curInlineChunk;
1245           curInlineChunk = null;  // Disallow adding any more index entries.
1246           return false;
1247         }
1248 
1249         return true;
1250       } else {
1251         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1252       }
1253     }
1254 
1255     /**
1256      * Write out the current inline index block. Inline blocks are non-root
1257      * blocks, so the non-root index format is used.
1258      *
1259      * @param out
1260      */
1261     @Override
1262     public void writeInlineBlock(DataOutput out) throws IOException {
1263       if (singleLevelOnly)
1264         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1265 
1266       // Write the inline block index to the output stream in the non-root
1267       // index block format.
1268       curInlineChunk.writeNonRoot(out);
1269 
1270       // Save the first key of the inline block so that we can add it to the
1271       // parent-level index.
1272       firstKey = curInlineChunk.getBlockKey(0);
1273 
1274       // Start a new inline index block
1275       curInlineChunk.clear();
1276     }
1277 
1278     /**
1279      * Called after an inline block has been written so that we can add an
1280      * entry referring to that block to the parent-level index.
1281      */
1282     @Override
1283     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1284       // Add leaf index block size
1285       totalBlockOnDiskSize += onDiskSize;
1286       totalBlockUncompressedSize += uncompressedSize;
1287 
1288       if (singleLevelOnly)
1289         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1290 
1291       if (firstKey == null) {
1292         throw new IllegalStateException("Trying to add second-level index " +
1293             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1294             "but the first key was not set in writeInlineBlock");
1295       }
1296 
1297       if (rootChunk.getNumEntries() == 0) {
1298         // We are writing the first leaf block, so increase index level.
1299         expectNumLevels(1);
1300         numLevels = 2;
1301       }
1302 
1303       // Add another entry to the second-level index. Include the number of
1304       // entries in all previous leaf-level chunks for mid-key calculation.
1305       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1306       firstKey = null;
1307     }
1308 
1309     @Override
1310     public BlockType getInlineBlockType() {
1311       return BlockType.LEAF_INDEX;
1312     }
1313 
1314     /**
1315      * Add one index entry to the current leaf-level block. When the leaf-level
1316      * block gets large enough, it will be flushed to disk as an inline block.
1317      *
1318      * @param firstKey the first key of the data block
1319      * @param blockOffset the offset of the data block
1320      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1321      *          format version 2), or the uncompressed size of the data block (
1322      *          {@link HFile} format version 1).
1323      */
1324     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1325       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1326       ++totalNumEntries;
1327     }
1328 
1329     /**
1330      * @throws IOException if we happened to write a multi-level index.
1331      */
1332     public void ensureSingleLevel() throws IOException {
1333       if (numLevels > 1) {
1334         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1335             rootChunk.getNumEntries() + " root-level entries, but " +
1336             "this is expected to be a single-level block index.");
1337       }
1338     }
1339 
1340     /**
1341      * @return true if we are using cache-on-write. This is configured by the
1342      *         caller of the constructor by either passing a valid block cache
1343      *         or null.
1344      */
1345     @Override
1346     public boolean getCacheOnWrite() {
1347       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1348     }
1349 
1350     /**
1351      * The total uncompressed size of the root index block, intermediate-level
1352      * index blocks, and leaf-level index blocks.
1353      *
1354      * @return the total uncompressed size of all index blocks
1355      */
1356     public long getTotalUncompressedSize() {
1357       return totalBlockUncompressedSize;
1358     }
1359 
1360   }
1361 
1362   /**
1363    * A single chunk of the block index in the process of writing. The data in
1364    * this chunk can become a leaf-level, intermediate-level, or root index
1365    * block.
1366    */
1367   static class BlockIndexChunk {
1368 
1369     /** First keys of the key range corresponding to each index entry. */
1370     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1371 
1372     /** Block offset in backing stream. */
1373     private final List<Long> blockOffsets = new ArrayList<Long>();
1374 
1375     /** On-disk data sizes of lower-level data or index blocks. */
1376     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1377 
1378     /**
1379      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1380      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1381      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1382      */
1383     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1384 
1385     /**
1386      * The offset of the next entry to be added, relative to the end of the
1387      * "secondary index" in the "non-root" format representation of this index
1388      * chunk. This is the next value to be added to the secondary index.
1389      */
1390     private int curTotalNonRootEntrySize = 0;
1391 
1392     /**
1393      * The accumulated size of this chunk if stored in the root index format.
1394      */
1395     private int curTotalRootSize = 0;
1396 
1397     /**
1398      * The "secondary index" used for binary search over variable-length
1399      * records in a "non-root" format block. These offsets are relative to the
1400      * end of this secondary index.
1401      */
1402     private final List<Integer> secondaryIndexOffsetMarks =
1403         new ArrayList<Integer>();
1404 
1405     /**
1406      * Adds a new entry to this block index chunk.
1407      *
1408      * @param firstKey the first key in the block pointed to by this entry
1409      * @param blockOffset the offset of the next-level block pointed to by this
1410      *          entry
1411      * @param onDiskDataSize the on-disk data of the block pointed to by this
1412      *          entry, including header size
1413      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1414      *          construction, this specifies the current total number of
1415      *          sub-entries in all leaf-level chunks, including the one
1416      *          corresponding to the second-level entry being added.
1417      */
1418     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1419         long curTotalNumSubEntries) {
1420       // Record the offset for the secondary index
1421       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1422       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1423           + firstKey.length;
1424 
1425       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1426           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1427 
1428       blockKeys.add(firstKey);
1429       blockOffsets.add(blockOffset);
1430       onDiskDataSizes.add(onDiskDataSize);
1431 
1432       if (curTotalNumSubEntries != -1) {
1433         numSubEntriesAt.add(curTotalNumSubEntries);
1434 
1435         // Make sure the parallel arrays are in sync.
1436         if (numSubEntriesAt.size() != blockKeys.size()) {
1437           throw new IllegalStateException("Only have key/value count " +
1438               "stats for " + numSubEntriesAt.size() + " block index " +
1439               "entries out of " + blockKeys.size());
1440         }
1441       }
1442     }
1443 
1444     /**
1445      * The same as {@link #add(byte[], long, int, long)} but does not take the
1446      * key/value into account. Used for single-level indexes.
1447      *
1448      * @see {@link #add(byte[], long, int, long)}
1449      */
1450     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1451       add(firstKey, blockOffset, onDiskDataSize, -1);
1452     }
1453 
1454     public void clear() {
1455       blockKeys.clear();
1456       blockOffsets.clear();
1457       onDiskDataSizes.clear();
1458       secondaryIndexOffsetMarks.clear();
1459       numSubEntriesAt.clear();
1460       curTotalNonRootEntrySize = 0;
1461       curTotalRootSize = 0;
1462     }
1463 
1464     /**
1465      * Finds the entry corresponding to the deeper-level index block containing
1466      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1467      * ordering of sub-entries.
1468      *
1469      * <p>
1470      * <i> Implementation note. </i> We are looking for i such that
1471      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1472      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1473      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1474      * sub-entries. i is by definition the insertion point of k in
1475      * numSubEntriesAt.
1476      *
1477      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1478      * @return the 0-based index of the entry corresponding to the given
1479      *         sub-entry
1480      */
1481     public int getEntryBySubEntry(long k) {
1482       // We define mid-key as the key corresponding to k'th sub-entry
1483       // (0-based).
1484 
1485       int i = Collections.binarySearch(numSubEntriesAt, k);
1486 
1487       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1488       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1489       // is in the (i + 1)'th chunk.
1490       if (i >= 0)
1491         return i + 1;
1492 
1493       // Inexact match. Return the insertion point.
1494       return -i - 1;
1495     }
1496 
1497     /**
1498      * Used when writing the root block index of a multi-level block index.
1499      * Serializes additional information allowing to efficiently identify the
1500      * mid-key.
1501      *
1502      * @return a few serialized fields for finding the mid-key
1503      * @throws IOException if could not create metadata for computing mid-key
1504      */
1505     public byte[] getMidKeyMetadata() throws IOException {
1506       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1507           MID_KEY_METADATA_SIZE);
1508       DataOutputStream baosDos = new DataOutputStream(baos);
1509       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1510       if (totalNumSubEntries == 0) {
1511         throw new IOException("No leaf-level entries, mid-key unavailable");
1512       }
1513       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1514       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1515 
1516       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1517       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1518 
1519       long numSubEntriesBefore = midKeyEntry > 0
1520           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1521       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1522       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1523       {
1524         throw new IOException("Could not identify mid-key index within the "
1525             + "leaf-level block containing mid-key: out of range ("
1526             + subEntryWithinEntry + ", numSubEntriesBefore="
1527             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1528             + ")");
1529       }
1530 
1531       baosDos.writeInt((int) subEntryWithinEntry);
1532 
1533       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1534         throw new IOException("Could not write mid-key metadata: size=" +
1535             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1536       }
1537 
1538       // Close just to be good citizens, although this has no effect.
1539       baos.close();
1540 
1541       return baos.toByteArray();
1542     }
1543 
1544     /**
1545      * Writes the block index chunk in the non-root index block format. This
1546      * format contains the number of entries, an index of integer offsets
1547      * for quick binary search on variable-length records, and tuples of
1548      * block offset, on-disk block size, and the first key for each entry.
1549      *
1550      * @param out
1551      * @throws IOException
1552      */
1553     void writeNonRoot(DataOutput out) throws IOException {
1554       // The number of entries in the block.
1555       out.writeInt(blockKeys.size());
1556 
1557       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1558         throw new IOException("Corrupted block index chunk writer: " +
1559             blockKeys.size() + " entries but " +
1560             secondaryIndexOffsetMarks.size() + " secondary index items");
1561       }
1562 
1563       // For each entry, write a "secondary index" of relative offsets to the
1564       // entries from the end of the secondary index. This works, because at
1565       // read time we read the number of entries and know where the secondary
1566       // index ends.
1567       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1568         out.writeInt(currentSecondaryIndex);
1569 
1570       // We include one other element in the secondary index to calculate the
1571       // size of each entry more easily by subtracting secondary index elements.
1572       out.writeInt(curTotalNonRootEntrySize);
1573 
1574       for (int i = 0; i < blockKeys.size(); ++i) {
1575         out.writeLong(blockOffsets.get(i));
1576         out.writeInt(onDiskDataSizes.get(i));
1577         out.write(blockKeys.get(i));
1578       }
1579     }
1580 
1581     /**
1582      * @return the size of this chunk if stored in the non-root index block
1583      *         format
1584      */
1585     int getNonRootSize() {
1586       return Bytes.SIZEOF_INT                          // Number of entries
1587           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1588           + curTotalNonRootEntrySize;                  // All entries
1589     }
1590 
1591     /**
1592      * Writes this chunk into the given output stream in the root block index
1593      * format. This format is similar to the {@link HFile} version 1 block
1594      * index format, except that we store on-disk size of the block instead of
1595      * its uncompressed size.
1596      *
1597      * @param out the data output stream to write the block index to. Typically
1598      *          a stream writing into an {@link HFile} block.
1599      * @throws IOException
1600      */
1601     void writeRoot(DataOutput out) throws IOException {
1602       for (int i = 0; i < blockKeys.size(); ++i) {
1603         out.writeLong(blockOffsets.get(i));
1604         out.writeInt(onDiskDataSizes.get(i));
1605         Bytes.writeByteArray(out, blockKeys.get(i));
1606       }
1607     }
1608 
1609     /**
1610      * @return the size of this chunk if stored in the root index block format
1611      */
1612     int getRootSize() {
1613       return curTotalRootSize;
1614     }
1615 
1616     /**
1617      * @return the number of entries in this block index chunk
1618      */
1619     public int getNumEntries() {
1620       return blockKeys.size();
1621     }
1622 
1623     public byte[] getBlockKey(int i) {
1624       return blockKeys.get(i);
1625     }
1626 
1627     public long getBlockOffset(int i) {
1628       return blockOffsets.get(i);
1629     }
1630 
1631     public int getOnDiskDataSize(int i) {
1632       return onDiskDataSizes.get(i);
1633     }
1634 
1635     public long getCumulativeNumKV(int i) {
1636       if (i < 0)
1637         return 0;
1638       return numSubEntriesAt.get(i);
1639     }
1640 
1641   }
1642 
1643   public static int getMaxChunkSize(Configuration conf) {
1644     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1645   }
1646 }