View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellComparator;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.io.HeapSize;
45  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
46  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
47  import org.apache.hadoop.hbase.nio.ByteBuff;
48  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.hbase.util.ObjectIntPair;
52  import org.apache.hadoop.io.WritableUtils;
53  import org.apache.hadoop.util.StringUtils;
54  
55  /**
56   * Provides functionality to write ({@link BlockIndexWriter}) and read
57   * BlockIndexReader
58   * single-level and multi-level block indexes.
59   *
60   * Examples of how to use the block index writer can be found in
61   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
62   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
63   *  found in {@link HFileReaderImpl} and
64   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
65   */
66  @InterfaceAudience.Private
67  public class HFileBlockIndex {
68  
69    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
70  
71    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
72  
73    /**
74     * The maximum size guideline for index blocks (both leaf, intermediate, and
75     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
76     */
77    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
78  
79    /**
80     * Minimum number of entries in a single index block. Even if we are above the
81     * hfile.index.block.max.size we will keep writing to the same block unless we have that many
82     * entries. We should have at least a few entries so that we don't have too many levels in the
83     * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
84     */
85    public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
86
87    static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
88
89    /**
90     * The number of bytes stored in each "secondary index" entry in addition to
91     * key bytes in the non-root index block format. The first long is the file
92     * offset of the deeper-level block the entry points to, and the int that
93     * follows is that block's on-disk size without including header.
94     */
95    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
96        + Bytes.SIZEOF_LONG;
97
98    /**
99     * Error message when trying to use inline block API in single-level mode.
100    */
101   private static final String INLINE_BLOCKS_NOT_ALLOWED =
102       "Inline blocks are not allowed in the single-level-only mode";
103
104   /**
105    * The size of a meta-data record used for finding the mid-key in a
106    * multi-level index. Consists of the middle leaf-level index block offset
107    * (long), its on-disk size without header included (int), and the mid-key
108    * entry's zero-based index in that leaf index block.
109    */
110   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
111       2 * Bytes.SIZEOF_INT;
112
113   /**
114    * An implementation of the BlockIndexReader that deals with block keys which are plain
115    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
116    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
117    */
118    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
119
120     private byte[][] blockKeys;
121
122     public ByteArrayKeyBlockIndexReader(final int treeLevel,
123         final CachingBlockReader cachingBlockReader) {
124       this(treeLevel);
125       this.cachingBlockReader = cachingBlockReader;
126     }
127
128     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
129       // Can be null for METAINDEX block
130       searchTreeLevel = treeLevel;
131     }
132
133     @Override
134     protected long calculateHeapSizeForBlockKeys(long heapSize) {
135       // Calculating the size of blockKeys
136       if (blockKeys != null) {
137         heapSize += ClassSize.REFERENCE;
138         // Adding array + references overhead
139         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
140
141         // Adding bytes
142         for (byte[] key : blockKeys) {
143           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
144         }
145       }
146       return heapSize;
147     }
148
149     @Override
150     public boolean isEmpty() {
151       return blockKeys.length == 0;
152     }
153
154     /**
155      * @param i
156      *          from 0 to {@link #getRootBlockCount() - 1}
157      */
158     public byte[] getRootBlockKey(int i) {
159       return blockKeys[i];
160     }
161
162     @Override
163     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
164         boolean cacheBlocks, boolean pread, boolean isCompaction,
165         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
166       // this would not be needed
167       return null;
168     }
169
170     @Override
171     public Cell midkey() throws IOException {
172       // Not needed here
173       return null;
174     }
175 
176     @Override
177     protected void initialize(int numEntries) {
178       blockKeys = new byte[numEntries][];
179     }
180
181     @Override
182     protected void add(final byte[] key, final long offset, final int dataSize) {
183       blockOffsets[rootCount] = offset;
184       blockKeys[rootCount] = key;
185       blockDataSizes[rootCount] = dataSize;
186       rootCount++;
187     }
188
189     @Override
190     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
191       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
192       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
193       // binarySearch's javadoc.
194
195       if (pos >= 0) {
196         // This means this is an exact match with an element of blockKeys.
197         assert pos < blockKeys.length;
198         return pos;
199       }
200
201       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
202       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
203       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
204       // key < blockKeys[0], meaning the file does not contain the given key.
205 
206       int i = -pos - 1;
207       assert 0 <= i && i <= blockKeys.length;
208       return i - 1;
209     }
210
211     @Override
212     public int rootBlockContainingKey(Cell key) {
213       // Should not be called on this because here it deals only with byte[]
214       throw new UnsupportedOperationException(
215           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
216           "can be searched for");
217     }
218
219     @Override
220     public String toString() {
221       StringBuilder sb = new StringBuilder();
222       sb.append("size=" + rootCount).append("\n");
223       for (int i = 0; i < rootCount; i++) {
224         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
225             .append("\n  offset=").append(blockOffsets[i])
226             .append(", dataSize=" + blockDataSizes[i]).append("\n");
227       }
228       return sb.toString();
229     }
230
231   }
232 
233   /**
234    * An implementation of the BlockIndexReader that deals with block keys which are the key
235    * part of a cell like the Data block index or the ROW_COL bloom blocks
236    * This needs a comparator to work with the Cells
237    */
238    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
239
240     private Cell[] blockKeys;
241     /** Pre-computed mid-key */
242     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
243     /** Needed doing lookup on blocks. */
244     private CellComparator comparator;
245
246     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
247         final CachingBlockReader cachingBlockReader) {
248       this(c, treeLevel);
249       this.cachingBlockReader = cachingBlockReader;
250     }
251
252     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
253       // Can be null for METAINDEX block
254       comparator = c;
255       searchTreeLevel = treeLevel;
256     }
257
258     @Override
259     protected long calculateHeapSizeForBlockKeys(long heapSize) {
260       if (blockKeys != null) {
261         heapSize += ClassSize.REFERENCE;
262         // Adding array + references overhead
263         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
264
265         // Adding blockKeys
266         for (Cell key : blockKeys) {
267           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
268         }
269       }
270       // Add comparator and the midkey atomicreference
271       heapSize += 2 * ClassSize.REFERENCE;
272       return heapSize;
273     }
274
275     @Override
276     public boolean isEmpty() {
277       return blockKeys.length == 0;
278     }
279
280     /**
281      * @param i
282      *          from 0 to {@link #getRootBlockCount() - 1}
283      */
284     public Cell getRootBlockKey(int i) {
285       return blockKeys[i];
286     }
287
288     @Override
289     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
290         boolean cacheBlocks, boolean pread, boolean isCompaction,
291         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
292       int rootLevelIndex = rootBlockContainingKey(key);
293       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
294         return null;
295       }
296
297       // the next indexed key
298       Cell nextIndexedKey = null;
299
300       // Read the next-level (intermediate or leaf) index block.
301       long currentOffset = blockOffsets[rootLevelIndex];
302       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
303
304       if (rootLevelIndex < blockKeys.length - 1) {
305         nextIndexedKey = blockKeys[rootLevelIndex + 1];
306       } else {
307         nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
308       }
309
310       int lookupLevel = 1; // How many levels deep we are in our lookup.
311       int index = -1;
312
313       HFileBlock block = null;
314       boolean dataBlock = false;
315       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
316       while (true) {
317         try {
318           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
319             // Avoid reading the same block again, even with caching turned off.
320             // This is crucial for compaction-type workload which might have
321             // caching turned off. This is like a one-block cache inside the
322             // scanner.
323             block = currentBlock;
324           } else {
325             // Call HFile's caching block reader API. We always cache index
326             // blocks, otherwise we might get terrible performance.
327             boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
328             BlockType expectedBlockType;
329             if (lookupLevel < searchTreeLevel - 1) {
330               expectedBlockType = BlockType.INTERMEDIATE_INDEX;
331             } else if (lookupLevel == searchTreeLevel - 1) {
332               expectedBlockType = BlockType.LEAF_INDEX;
333             } else {
334               // this also accounts for ENCODED_DATA
335               expectedBlockType = BlockType.DATA;
336             }
337             block =
338                 cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
339                   isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
340           }
341
342           if (block == null) {
343             throw new IOException("Failed to read block at offset " + currentOffset
344                 + ", onDiskSize=" + currentOnDiskSize);
345           }
346
347           // Found a data block, break the loop and check our level in the tree.
348           if (block.getBlockType().isData()) {
349             dataBlock = true;
350             break;
351           }
352
353           // Not a data block. This must be a leaf-level or intermediate-level
354           // index block. We don't allow going deeper than searchTreeLevel.
355           if (++lookupLevel > searchTreeLevel) {
356             throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
357                 + ", searchTreeLevel=" + searchTreeLevel);
358           }
359
360           // Locate the entry corresponding to the given key in the non-root
361           // (leaf or intermediate-level) index block.
362           ByteBuff buffer = block.getBufferWithoutHeader();
363           index = locateNonRootIndexEntry(buffer, key, comparator);
364           if (index == -1) {
365             // This has to be changed
366             // For now change this to key value
367             throw new IOException("The key "
368                 + CellUtil.getCellKeyAsString(key)
369                 + " is before the" + " first key of the non-root index block " + block);
370           }
371
372           currentOffset = buffer.getLong();
373           currentOnDiskSize = buffer.getInt();
374
375           // Only update next indexed key if there is a next indexed key in the current level
376           byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
377           if (nonRootIndexedKey != null) {
378             tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
379             nextIndexedKey = tmpNextIndexKV;
380           }
381         } finally {
382           if (!dataBlock) {
383             // Return the block immediately if it is not the
384             // data block
385             cachingBlockReader.returnBlock(block);
386           }
387         }
388       }
389
390       if (lookupLevel != searchTreeLevel) {
391         assert dataBlock == true;
392         // Though we have retrieved a data block we have found an issue
393         // in the retrieved data block. Hence returned the block so that
394         // the ref count can be decremented
395         cachingBlockReader.returnBlock(block);
396         throw new IOException("Reached a data block at level " + lookupLevel +
397             " but the number of levels is " + searchTreeLevel);
398       }
399
400       // set the next indexed key for the current block.
401       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
402       return blockWithScanInfo;
403     }
404
405     @Override
406     public Cell midkey() throws IOException {
407       if (rootCount == 0)
408         throw new IOException("HFile empty");
409
410       Cell targetMidKey = this.midKey.get();
411       if (targetMidKey != null) {
412         return targetMidKey;
413       }
414
415       if (midLeafBlockOffset >= 0) {
416         if (cachingBlockReader == null) {
417           throw new IOException("Have to read the middle leaf block but " +
418               "no block reader available");
419         }
420
421         // Caching, using pread, assuming this is not a compaction.
422         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
423             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
424             BlockType.LEAF_INDEX, null);
425         try {
426           ByteBuff b = midLeafBlock.getBufferWithoutHeader();
427           int numDataBlocks = b.getIntAfterPosition(0);
428           int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
429           int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
430           int keyOffset =
431               Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
432                   + SECONDARY_INDEX_ENTRY_OVERHEAD;
433           byte[] bytes = b.toBytes(keyOffset, keyLen);
434           targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
435         } finally {
436           cachingBlockReader.returnBlock(midLeafBlock);
437         }
438       } else {
439         // The middle of the root-level index.
440         targetMidKey = blockKeys[rootCount / 2];
441       }
442
443       this.midKey.set(targetMidKey);
444       return targetMidKey;
445     }
446
447     @Override
448     protected void initialize(int numEntries) {
449       blockKeys = new Cell[numEntries];
450     }
451
452     /**
453      * Adds a new entry in the root block index. Only used when reading.
454      *
455      * @param key Last key in the block
456      * @param offset file offset where the block is stored
457      * @param dataSize the uncompressed data size
458      */
459     @Override
460     protected void add(final byte[] key, final long offset, final int dataSize) {
461       blockOffsets[rootCount] = offset;
462       // Create the blockKeys as Cells once when the reader is opened
463       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
464       blockDataSizes[rootCount] = dataSize;
465       rootCount++;
466     }
467
468     @Override
469     public int rootBlockContainingKey(final byte[] key, int offset, int length,
470         CellComparator comp) {
471       // This should always be called with Cell not with a byte[] key
472       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
473       		"array. Only cell based keys can be searched for");
474     }
475
476     @Override
477     public int rootBlockContainingKey(Cell key) {
478       // Here the comparator should not be null as this happens for the root-level block
479       int pos = Bytes.binarySearch(blockKeys, key, comparator);
480       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
481       // binarySearch's javadoc.
482
483       if (pos >= 0) {
484         // This means this is an exact match with an element of blockKeys.
485         assert pos < blockKeys.length;
486         return pos;
487       }
488
489       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
490       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
491       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
492       // key < blockKeys[0], meaning the file does not contain the given key.
493
494       int i = -pos - 1;
495       assert 0 <= i && i <= blockKeys.length;
496       return i - 1;
497     }
498
499     @Override
500     public String toString() {
501       StringBuilder sb = new StringBuilder();
502       sb.append("size=" + rootCount).append("\n");
503       for (int i = 0; i < rootCount; i++) {
504         sb.append("key=").append((blockKeys[i]))
505             .append("\n  offset=").append(blockOffsets[i])
506             .append(", dataSize=" + blockDataSizes[i]).append("\n");
507       }
508       return sb.toString();
509     }
510   }
511    /**
512    * The reader will always hold the root level index in the memory. Index
513    * blocks at all other levels will be cached in the LRU cache in practice,
514    * although this API does not enforce that.
515    *
516    * All non-root (leaf and intermediate) index blocks contain what we call a
517    * "secondary index": an array of offsets to the entries within the block.
518    * This allows us to do binary search for the entry corresponding to the
519    * given key without having to deserialize the block.
520    */
521    static abstract class BlockIndexReader implements HeapSize {
522
523     protected long[] blockOffsets;
524     protected int[] blockDataSizes;
525     protected int rootCount = 0;
526
527     // Mid-key metadata.
528     protected long midLeafBlockOffset = -1;
529     protected int midLeafBlockOnDiskSize = -1;
530     protected int midKeyEntry = -1;
531
532     /**
533      * The number of levels in the block index tree. One if there is only root
534      * level, two for root and leaf levels, etc.
535      */
536     protected int searchTreeLevel;
537
538     /** A way to read {@link HFile} blocks at a given offset */
539     protected CachingBlockReader cachingBlockReader;
540
541     /**
542      * @return true if the block index is empty.
543      */
544     public abstract boolean isEmpty();
545
546     /**
547      * Verifies that the block index is non-empty and throws an
548      * {@link IllegalStateException} otherwise.
549      */
550     public void ensureNonEmpty() {
551       if (isEmpty()) {
552         throw new IllegalStateException("Block index is empty or not loaded");
553       }
554     }
555
556     /**
557      * Return the data block which contains this key. This function will only
558      * be called when the HFile version is larger than 1.
559      *
560      * @param key the key we are looking for
561      * @param currentBlock the current block, to avoid re-reading the same block
562      * @param cacheBlocks
563      * @param pread
564      * @param isCompaction
565      * @param expectedDataBlockEncoding the data block encoding the caller is
566      *          expecting the data block to be in, or null to not perform this
567      *          check and return the block irrespective of the encoding
568      * @return reader a basic way to load blocks
569      * @throws IOException
570      */
571     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
572         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
573         throws IOException {
574       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
575           cacheBlocks,
576           pread, isCompaction, expectedDataBlockEncoding);
577       if (blockWithScanInfo == null) {
578         return null;
579       } else {
580         return blockWithScanInfo.getHFileBlock();
581       }
582     }
583
584     /**
585      * Return the BlockWithScanInfo which contains the DataBlock with other scan
586      * info such as nextIndexedKey. This function will only be called when the
587      * HFile version is larger than 1.
588      *
589      * @param key
590      *          the key we are looking for
591      * @param currentBlock
592      *          the current block, to avoid re-reading the same block
593      * @param cacheBlocks
594      * @param pread
595      * @param isCompaction
596      * @param expectedDataBlockEncoding the data block encoding the caller is
597      *          expecting the data block to be in, or null to not perform this
598      *          check and return the block irrespective of the encoding.
599      * @return the BlockWithScanInfo which contains the DataBlock with other
600      *         scan info such as nextIndexedKey.
601      * @throws IOException
602      */
603     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
604         boolean cacheBlocks,
605         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
606         throws IOException;
607
608     /**
609      * An approximation to the {@link HFile}'s mid-key. Operates on block
610      * boundaries, and does not go inside blocks. In other words, returns the
611      * first key of the middle block of the file.
612      *
613      * @return the first key of the middle block
614      */
615     public abstract Cell midkey() throws IOException;
616
617     /**
618      * @param i from 0 to {@link #getRootBlockCount() - 1}
619      */
620     public long getRootBlockOffset(int i) {
621       return blockOffsets[i];
622     }
623
624     /**
625      * @param i zero-based index of a root-level block
626      * @return the on-disk size of the root-level block for version 2, or the
627      *         uncompressed size for version 1
628      */
629     public int getRootBlockDataSize(int i) {
630       return blockDataSizes[i];
631     }
632
633     /**
634      * @return the number of root-level blocks in this block index
635      */
636     public int getRootBlockCount() {
637       return rootCount;
638     }
639
640     /**
641      * Finds the root-level index block containing the given key.
642      *
643      * @param key
644      *          Key to find
645      * @param comp
646      *          the comparator to be used
647      * @return Offset of block containing <code>key</code> (between 0 and the
648      *         number of blocks - 1) or -1 if this file does not contain the
649      *         request.
650      */
651     // When we want to find the meta index block or bloom block for ROW bloom
652     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
653     // CellComparator.
654     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
655         CellComparator comp);
656
657     /**
658      * Finds the root-level index block containing the given key.
659      *
660      * @param key
661      *          Key to find
662      * @return Offset of block containing <code>key</code> (between 0 and the
663      *         number of blocks - 1) or -1 if this file does not contain the
664      *         request.
665      */
666     // When we want to find the meta index block or bloom block for ROW bloom
667     // type
668     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
669     // need the CellComparator.
670     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
671       return rootBlockContainingKey(key, offset, length, null);
672     }
673
674     /**
675      * Finds the root-level index block containing the given key.
676      *
677      * @param key
678      *          Key to find
679      */
680     public abstract int rootBlockContainingKey(final Cell key);
681
682     /**
683      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
684      * @param nonRootIndex
685      * @param i the ith position
686      * @return The indexed key at the ith position in the nonRootIndex.
687      */
688     protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
689       int numEntries = nonRootIndex.getInt(0);
690       if (i < 0 || i >= numEntries) {
691         return null;
692       }
693
694       // Entries start after the number of entries and the secondary index.
695       // The secondary index takes numEntries + 1 ints.
696       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
697       // Targetkey's offset relative to the end of secondary index
698       int targetKeyRelOffset = nonRootIndex.getInt(
699           Bytes.SIZEOF_INT * (i + 1));
700
701       // The offset of the target key in the blockIndex buffer
702       int targetKeyOffset = entriesOffset     // Skip secondary index
703           + targetKeyRelOffset               // Skip all entries until mid
704           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
705
706       // We subtract the two consecutive secondary index elements, which
707       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
708       // then need to subtract the overhead of offset and onDiskSize.
709       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
710         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
711
712       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
713       return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
714     }
715
716     /**
717      * Performs a binary search over a non-root level index block. Utilizes the
718      * secondary index, which records the offsets of (offset, onDiskSize,
719      * firstKey) tuples of all entries.
720      *
721      * @param key
722      *          the key we are searching for offsets to individual entries in
723      *          the blockIndex buffer
724      * @param nonRootIndex
725      *          the non-root index block buffer, starting with the secondary
726      *          index. The position is ignored.
727      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
728      *         keys[i + 1], if keys is the array of all keys being searched, or
729      *         -1 otherwise
730      * @throws IOException
731      */
732     static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
733         CellComparator comparator) {
734
735       int numEntries = nonRootIndex.getIntAfterPosition(0);
736       int low = 0;
737       int high = numEntries - 1;
738       int mid = 0;
739
740       // Entries start after the number of entries and the secondary index.
741       // The secondary index takes numEntries + 1 ints.
742       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
743
744       // If we imagine that keys[-1] = -Infinity and
745       // keys[numEntries] = Infinity, then we are maintaining an invariant that
746       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
747       ByteBufferedKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferedKeyOnlyKeyValue();
748       ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<ByteBuffer>();
749       while (low <= high) {
750         mid = (low + high) >>> 1;
751
752         // Midkey's offset relative to the end of secondary index
753         int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
754
755         // The offset of the middle key in the blockIndex buffer
756         int midKeyOffset = entriesOffset       // Skip secondary index
757             + midKeyRelOffset                  // Skip all entries until mid
758             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
759
760         // We subtract the two consecutive secondary index elements, which
761         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
762         // then need to subtract the overhead of offset and onDiskSize.
763         int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
764             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
765
766         // we have to compare in this order, because the comparator order
767         // has special logic when the 'left side' is a special key.
768         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
769         // done after HBASE-12224 & HBASE-12282
770         // TODO avoid array call.
771         nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
772         nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
773         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexkeyOnlyKV);
774
775         // key lives above the midpoint
776         if (cmp > 0)
777           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
778         // key lives below the midpoint
779         else if (cmp < 0)
780           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
781         else
782           return mid; // exact match
783       }
784
785       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
786       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
787       // condition, low >= high + 1. Therefore, low = high + 1.
788
789       if (low != high + 1) {
790         throw new IllegalStateException("Binary search broken: low=" + low
791             + " " + "instead of " + (high + 1));
792       }
793
794       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
795       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
796       int i = low - 1;
797
798       // Some extra validation on the result.
799       if (i < -1 || i >= numEntries) {
800         throw new IllegalStateException("Binary search broken: result is " +
801             i + " but expected to be between -1 and (numEntries - 1) = " +
802             (numEntries - 1));
803       }
804
805       return i;
806     }
807
808     /**
809      * Search for one key using the secondary index in a non-root block. In case
810      * of success, positions the provided buffer at the entry of interest, where
811      * the file offset and the on-disk-size can be read.
812      *
813      * @param nonRootBlock
814      *          a non-root block without header. Initial position does not
815      *          matter.
816      * @param key
817      *          the byte array containing the key
818      * @return the index position where the given key was found, otherwise
819      *         return -1 in the case the given key is before the first key.
820      *
821      */
822     static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
823         CellComparator comparator) {
824       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
825
826       if (entryIndex != -1) {
827         int numEntries = nonRootBlock.getIntAfterPosition(0);
828
829         // The end of secondary index and the beginning of entries themselves.
830         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
831
832         // The offset of the entry we are interested in relative to the end of
833         // the secondary index.
834         int entryRelOffset = nonRootBlock
835             .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
836
837         nonRootBlock.position(entriesOffset + entryRelOffset);
838       }
839
840       return entryIndex;
841     }
842
843     /**
844      * Read in the root-level index from the given input stream. Must match
845      * what was written into the root level by
846      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
847      * offset that function returned.
848      *
849      * @param in the buffered input stream or wrapped byte input stream
850      * @param numEntries the number of root-level index entries
851      * @throws IOException
852      */
853     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
854       blockOffsets = new long[numEntries];
855       initialize(numEntries);
856       blockDataSizes = new int[numEntries];
857
858       // If index size is zero, no index was written.
859       if (numEntries > 0) {
860         for (int i = 0; i < numEntries; ++i) {
861           long offset = in.readLong();
862           int dataSize = in.readInt();
863           byte[] key = Bytes.readByteArray(in);
864           add(key, offset, dataSize);
865         }
866       }
867     }
868
869     protected abstract void initialize(int numEntries);
870
871     protected abstract void add(final byte[] key, final long offset, final int dataSize);
872
873     /**
874      * Read in the root-level index from the given input stream. Must match
875      * what was written into the root level by
876      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
877      * offset that function returned.
878      *
879      * @param blk the HFile block
880      * @param numEntries the number of root-level index entries
881      * @return the buffered input stream or wrapped byte input stream
882      * @throws IOException
883      */
884     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
885       DataInputStream in = blk.getByteStream();
886       readRootIndex(in, numEntries);
887       return in;
888     }
889
890     /**
891      * Read the root-level metadata of a multi-level block index. Based on
892      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
893      * necessary to compute the mid-key in a multi-level index.
894      *
895      * @param blk the HFile block
896      * @param numEntries the number of root-level index entries
897      * @throws IOException
898      */
899     public void readMultiLevelIndexRoot(HFileBlock blk,
900         final int numEntries) throws IOException {
901       DataInputStream in = readRootIndex(blk, numEntries);
902       // after reading the root index the checksum bytes have to
903       // be subtracted to know if the mid key exists.
904       int checkSumBytes = blk.totalChecksumBytes();
905       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
906         // No mid-key metadata available.
907         return;
908       }
909       midLeafBlockOffset = in.readLong();
910       midLeafBlockOnDiskSize = in.readInt();
911       midKeyEntry = in.readInt();
912     }
913
914     @Override
915     public long heapSize() {
916       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
917       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
918           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
919
920       // Mid-key metadata.
921       heapSize += MID_KEY_METADATA_SIZE;
922
923       heapSize = calculateHeapSizeForBlockKeys(heapSize);
924
925       if (blockOffsets != null) {
926         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
927             * Bytes.SIZEOF_LONG);
928       }
929
930       if (blockDataSizes != null) {
931         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
932             * Bytes.SIZEOF_INT);
933       }
934
935       return ClassSize.align(heapSize);
936     }
937
938     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
939   }
940
941   /**
942    * Writes the block index into the output stream. Generate the tree from
943    * bottom up. The leaf level is written to disk as a sequence of inline
944    * blocks, if it is larger than a certain number of bytes. If the leaf level
945    * is not large enough, we write all entries to the root level instead.
946    *
947    * After all leaf blocks have been written, we end up with an index
948    * referencing the resulting leaf index blocks. If that index is larger than
949    * the allowed root index size, the writer will break it up into
950    * reasonable-size intermediate-level index block chunks write those chunks
951    * out, and create another index referencing those chunks. This will be
952    * repeated until the remaining index is small enough to become the root
953    * index. However, in most practical cases we will only have leaf-level
954    * blocks and the root index, or just the root index.
955    */
956   public static class BlockIndexWriter implements InlineBlockWriter {
957     /**
958      * While the index is being written, this represents the current block
959      * index referencing all leaf blocks, with one exception. If the file is
960      * being closed and there are not enough blocks to complete even a single
961      * leaf block, no leaf blocks get written and this contains the entire
962      * block index. After all levels of the index were written by
963      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
964      * root-level index.
965      */
966     private BlockIndexChunk rootChunk = new BlockIndexChunk();
967
968     /**
969      * Current leaf-level chunk. New entries referencing data blocks get added
970      * to this chunk until it grows large enough to be written to disk.
971      */
972     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
973
974     /**
975      * The number of block index levels. This is one if there is only root
976      * level (even empty), two if there a leaf level and root level, and is
977      * higher if there are intermediate levels. This is only final after
978      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
979      * initial value accounts for the root level, and will be increased to two
980      * as soon as we find out there is a leaf-level in
981      * {@link #blockWritten(long, int, int)}.
982      */
983     private int numLevels = 1;
984
985     private HFileBlock.Writer blockWriter;
986     private byte[] firstKey = null;
987
988     /**
989      * The total number of leaf-level entries, i.e. entries referenced by
990      * leaf-level blocks. For the data block index this is equal to the number
991      * of data blocks.
992      */
993     private long totalNumEntries;
994
995     /** Total compressed size of all index blocks. */
996     private long totalBlockOnDiskSize;
997
998     /** Total uncompressed size of all index blocks. */
999     private long totalBlockUncompressedSize;
1000
1001     /** The maximum size guideline of all multi-level index blocks. */
1002     private int maxChunkSize;
1003
1004     /** The maximum level of multi-level index blocks */
1005     private int minIndexNumEntries;
1006
1007     /** Whether we require this block index to always be single-level. */
1008     private boolean singleLevelOnly;
1009
1010     /** CacheConfig, or null if cache-on-write is disabled */
1011     private CacheConfig cacheConf;
1012
1013     /** Name to use for computing cache keys */
1014     private String nameForCaching;
1015
1016     /** Creates a single-level block index writer */
1017     public BlockIndexWriter() {
1018       this(null, null, null);
1019       singleLevelOnly = true;
1020     }
1021
1022     /**
1023      * Creates a multi-level block index writer.
1024      *
1025      * @param blockWriter the block writer to use to write index blocks
1026      * @param cacheConf used to determine when and how a block should be cached-on-write.
1027      */
1028     public BlockIndexWriter(HFileBlock.Writer blockWriter,
1029         CacheConfig cacheConf, String nameForCaching) {
1030       if ((cacheConf == null) != (nameForCaching == null)) {
1031         throw new IllegalArgumentException("Block cache and file name for " +
1032             "caching must be both specified or both null");
1033       }
1034
1035       this.blockWriter = blockWriter;
1036       this.cacheConf = cacheConf;
1037       this.nameForCaching = nameForCaching;
1038       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1039       this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1040     }
1041
1042     public void setMaxChunkSize(int maxChunkSize) {
1043       if (maxChunkSize <= 0) {
1044         throw new IllegalArgumentException("Invalid maximum index block size");
1045       }
1046       this.maxChunkSize = maxChunkSize;
1047     }
1048
1049     public void setMinIndexNumEntries(int minIndexNumEntries) {
1050       if (minIndexNumEntries <= 1) {
1051         throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1052       }
1053       this.minIndexNumEntries = minIndexNumEntries;
1054     }
1055
1056     /**
1057      * Writes the root level and intermediate levels of the block index into
1058      * the output stream, generating the tree from bottom up. Assumes that the
1059      * leaf level has been inline-written to the disk if there is enough data
1060      * for more than one leaf block. We iterate by breaking the current level
1061      * of the block index, starting with the index of all leaf-level blocks,
1062      * into chunks small enough to be written to disk, and generate its parent
1063      * level, until we end up with a level small enough to become the root
1064      * level.
1065      *
1066      * If the leaf level is not large enough, there is no inline block index
1067      * anymore, so we only write that level of block index to disk as the root
1068      * level.
1069      *
1070      * @param out FSDataOutputStream
1071      * @return position at which we entered the root-level index.
1072      * @throws IOException
1073      */
1074     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1075       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1076         throw new IOException("Trying to write a multi-level block index, " +
1077             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1078             "last inline chunk.");
1079       }
1080
1081       // We need to get mid-key metadata before we create intermediate
1082       // indexes and overwrite the root chunk.
1083       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1084           : null;
1085
1086       if (curInlineChunk != null) {
1087         while (rootChunk.getRootSize() > maxChunkSize
1088             // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1089             && rootChunk.getNumEntries() > minIndexNumEntries
1090             // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1091             && numLevels < 16) {
1092           rootChunk = writeIntermediateLevel(out, rootChunk);
1093           numLevels += 1;
1094         }
1095       }
1096
1097       // write the root level
1098       long rootLevelIndexPos = out.getPos();
1099
1100       {
1101         DataOutput blockStream =
1102             blockWriter.startWriting(BlockType.ROOT_INDEX);
1103         rootChunk.writeRoot(blockStream);
1104         if (midKeyMetadata != null)
1105           blockStream.write(midKeyMetadata);
1106         blockWriter.writeHeaderAndData(out);
1107         if (cacheConf != null) {
1108           HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1109           cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1110             rootLevelIndexPos, true, blockForCaching.getBlockType()), blockForCaching);
1111         }
1112       }
1113
1114       // Add root index block size
1115       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1116       totalBlockUncompressedSize +=
1117           blockWriter.getUncompressedSizeWithoutHeader();
1118
1119       if (LOG.isTraceEnabled()) {
1120         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1121           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1122           + " root-level entries, " + totalNumEntries + " total entries, "
1123           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1124           " on-disk size, "
1125           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1126           " total uncompressed size.");
1127       }
1128       return rootLevelIndexPos;
1129     }
1130
1131     /**
1132      * Writes the block index data as a single level only. Does not do any
1133      * block framing.
1134      *
1135      * @param out the buffered output stream to write the index to. Typically a
1136      *          stream writing into an {@link HFile} block.
1137      * @param description a short description of the index being written. Used
1138      *          in a log message.
1139      * @throws IOException
1140      */
1141     public void writeSingleLevelIndex(DataOutput out, String description)
1142         throws IOException {
1143       expectNumLevels(1);
1144
1145       if (!singleLevelOnly)
1146         throw new IOException("Single-level mode is turned off");
1147
1148       if (rootChunk.getNumEntries() > 0)
1149         throw new IOException("Root-level entries already added in " +
1150             "single-level mode");
1151
1152       rootChunk = curInlineChunk;
1153       curInlineChunk = new BlockIndexChunk();
1154
1155       if (LOG.isTraceEnabled()) {
1156         LOG.trace("Wrote a single-level " + description + " index with "
1157           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1158           + " bytes");
1159       }
1160       rootChunk.writeRoot(out);
1161     }
1162
1163     /**
1164      * Split the current level of the block index into intermediate index
1165      * blocks of permitted size and write those blocks to disk. Return the next
1166      * level of the block index referencing those intermediate-level blocks.
1167      *
1168      * @param out
1169      * @param currentLevel the current level of the block index, such as the a
1170      *          chunk referencing all leaf-level index blocks
1171      * @return the parent level block index, which becomes the root index after
1172      *         a few (usually zero) iterations
1173      * @throws IOException
1174      */
1175     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1176         BlockIndexChunk currentLevel) throws IOException {
1177       // Entries referencing intermediate-level blocks we are about to create.
1178       BlockIndexChunk parent = new BlockIndexChunk();
1179
1180       // The current intermediate-level block index chunk.
1181       BlockIndexChunk curChunk = new BlockIndexChunk();
1182
1183       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1184         curChunk.add(currentLevel.getBlockKey(i),
1185             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1186
1187         // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1188         // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1189         // first key is larger than maxChunkSize this will cause infinite recursion.
1190         if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1191           writeIntermediateBlock(out, parent, curChunk);
1192         }
1193       }
1194
1195       if (curChunk.getNumEntries() > 0) {
1196         writeIntermediateBlock(out, parent, curChunk);
1197       }
1198
1199       return parent;
1200     }
1201
1202     private void writeIntermediateBlock(FSDataOutputStream out,
1203         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1204       long beginOffset = out.getPos();
1205       DataOutputStream dos = blockWriter.startWriting(
1206           BlockType.INTERMEDIATE_INDEX);
1207       curChunk.writeNonRoot(dos);
1208       byte[] curFirstKey = curChunk.getBlockKey(0);
1209       blockWriter.writeHeaderAndData(out);
1210
1211       if (getCacheOnWrite()) {
1212         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1213         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1214           beginOffset, true, blockForCaching.getBlockType()), blockForCaching);
1215       }
1216
1217       // Add intermediate index block size
1218       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1219       totalBlockUncompressedSize +=
1220           blockWriter.getUncompressedSizeWithoutHeader();
1221
1222       // OFFSET is the beginning offset the chunk of block index entries.
1223       // SIZE is the total byte size of the chunk of block index entries
1224       // + the secondary index size
1225       // FIRST_KEY is the first key in the chunk of block index
1226       // entries.
1227       parent.add(curFirstKey, beginOffset,
1228           blockWriter.getOnDiskSizeWithHeader());
1229
1230       // clear current block index chunk
1231       curChunk.clear();
1232       curFirstKey = null;
1233     }
1234
1235     /**
1236      * @return how many block index entries there are in the root level
1237      */
1238     public final int getNumRootEntries() {
1239       return rootChunk.getNumEntries();
1240     }
1241
1242     /**
1243      * @return the number of levels in this block index.
1244      */
1245     public int getNumLevels() {
1246       return numLevels;
1247     }
1248
1249     private void expectNumLevels(int expectedNumLevels) {
1250       if (numLevels != expectedNumLevels) {
1251         throw new IllegalStateException("Number of block index levels is "
1252             + numLevels + "but is expected to be " + expectedNumLevels);
1253       }
1254     }
1255
1256     /**
1257      * Whether there is an inline block ready to be written. In general, we
1258      * write an leaf-level index block as an inline block as soon as its size
1259      * as serialized in the non-root format reaches a certain threshold.
1260      */
1261     @Override
1262     public boolean shouldWriteBlock(boolean closing) {
1263       if (singleLevelOnly) {
1264         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1265       }
1266
1267       if (curInlineChunk == null) {
1268         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1269             "called with closing=true and then called again?");
1270       }
1271
1272       if (curInlineChunk.getNumEntries() == 0) {
1273         return false;
1274       }
1275
1276       // We do have some entries in the current inline chunk.
1277       if (closing) {
1278         if (rootChunk.getNumEntries() == 0) {
1279           // We did not add any leaf-level blocks yet. Instead of creating a
1280           // leaf level with one block, move these entries to the root level.
1281
1282           expectNumLevels(1);
1283           rootChunk = curInlineChunk;
1284           curInlineChunk = null;  // Disallow adding any more index entries.
1285           return false;
1286         }
1287
1288         return true;
1289       } else {
1290         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1291       }
1292     }
1293
1294     /**
1295      * Write out the current inline index block. Inline blocks are non-root
1296      * blocks, so the non-root index format is used.
1297      *
1298      * @param out
1299      */
1300     @Override
1301     public void writeInlineBlock(DataOutput out) throws IOException {
1302       if (singleLevelOnly)
1303         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1304
1305       // Write the inline block index to the output stream in the non-root
1306       // index block format.
1307       curInlineChunk.writeNonRoot(out);
1308
1309       // Save the first key of the inline block so that we can add it to the
1310       // parent-level index.
1311       firstKey = curInlineChunk.getBlockKey(0);
1312
1313       // Start a new inline index block
1314       curInlineChunk.clear();
1315     }
1316
1317     /**
1318      * Called after an inline block has been written so that we can add an
1319      * entry referring to that block to the parent-level index.
1320      */
1321     @Override
1322     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1323       // Add leaf index block size
1324       totalBlockOnDiskSize += onDiskSize;
1325       totalBlockUncompressedSize += uncompressedSize;
1326
1327       if (singleLevelOnly)
1328         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1329
1330       if (firstKey == null) {
1331         throw new IllegalStateException("Trying to add second-level index " +
1332             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1333             "but the first key was not set in writeInlineBlock");
1334       }
1335
1336       if (rootChunk.getNumEntries() == 0) {
1337         // We are writing the first leaf block, so increase index level.
1338         expectNumLevels(1);
1339         numLevels = 2;
1340       }
1341
1342       // Add another entry to the second-level index. Include the number of
1343       // entries in all previous leaf-level chunks for mid-key calculation.
1344       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1345       firstKey = null;
1346     }
1347
1348     @Override
1349     public BlockType getInlineBlockType() {
1350       return BlockType.LEAF_INDEX;
1351     }
1352
1353     /**
1354      * Add one index entry to the current leaf-level block. When the leaf-level
1355      * block gets large enough, it will be flushed to disk as an inline block.
1356      *
1357      * @param firstKey the first key of the data block
1358      * @param blockOffset the offset of the data block
1359      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1360      *          format version 2), or the uncompressed size of the data block (
1361      *          {@link HFile} format version 1).
1362      */
1363     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1364       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1365       ++totalNumEntries;
1366     }
1367
1368     /**
1369      * @throws IOException if we happened to write a multi-level index.
1370      */
1371     public void ensureSingleLevel() throws IOException {
1372       if (numLevels > 1) {
1373         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1374             rootChunk.getNumEntries() + " root-level entries, but " +
1375             "this is expected to be a single-level block index.");
1376       }
1377     }
1378
1379     /**
1380      * @return true if we are using cache-on-write. This is configured by the
1381      *         caller of the constructor by either passing a valid block cache
1382      *         or null.
1383      */
1384     @Override
1385     public boolean getCacheOnWrite() {
1386       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1387     }
1388
1389     /**
1390      * The total uncompressed size of the root index block, intermediate-level
1391      * index blocks, and leaf-level index blocks.
1392      *
1393      * @return the total uncompressed size of all index blocks
1394      */
1395     public long getTotalUncompressedSize() {
1396       return totalBlockUncompressedSize;
1397     }
1398
1399   }
1400
1401   /**
1402    * A single chunk of the block index in the process of writing. The data in
1403    * this chunk can become a leaf-level, intermediate-level, or root index
1404    * block.
1405    */
1406   static class BlockIndexChunk {
1407
1408     /** First keys of the key range corresponding to each index entry. */
1409     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1410
1411     /** Block offset in backing stream. */
1412     private final List<Long> blockOffsets = new ArrayList<Long>();
1413
1414     /** On-disk data sizes of lower-level data or index blocks. */
1415     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1416
1417     /**
1418      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1419      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1420      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1421      */
1422     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1423
1424     /**
1425      * The offset of the next entry to be added, relative to the end of the
1426      * "secondary index" in the "non-root" format representation of this index
1427      * chunk. This is the next value to be added to the secondary index.
1428      */
1429     private int curTotalNonRootEntrySize = 0;
1430
1431     /**
1432      * The accumulated size of this chunk if stored in the root index format.
1433      */
1434     private int curTotalRootSize = 0;
1435
1436     /**
1437      * The "secondary index" used for binary search over variable-length
1438      * records in a "non-root" format block. These offsets are relative to the
1439      * end of this secondary index.
1440      */
1441     private final List<Integer> secondaryIndexOffsetMarks =
1442         new ArrayList<Integer>();
1443
1444     /**
1445      * Adds a new entry to this block index chunk.
1446      *
1447      * @param firstKey the first key in the block pointed to by this entry
1448      * @param blockOffset the offset of the next-level block pointed to by this
1449      *          entry
1450      * @param onDiskDataSize the on-disk data of the block pointed to by this
1451      *          entry, including header size
1452      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1453      *          construction, this specifies the current total number of
1454      *          sub-entries in all leaf-level chunks, including the one
1455      *          corresponding to the second-level entry being added.
1456      */
1457     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1458         long curTotalNumSubEntries) {
1459       // Record the offset for the secondary index
1460       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1461       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1462           + firstKey.length;
1463
1464       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1465           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1466
1467       blockKeys.add(firstKey);
1468       blockOffsets.add(blockOffset);
1469       onDiskDataSizes.add(onDiskDataSize);
1470
1471       if (curTotalNumSubEntries != -1) {
1472         numSubEntriesAt.add(curTotalNumSubEntries);
1473
1474         // Make sure the parallel arrays are in sync.
1475         if (numSubEntriesAt.size() != blockKeys.size()) {
1476           throw new IllegalStateException("Only have key/value count " +
1477               "stats for " + numSubEntriesAt.size() + " block index " +
1478               "entries out of " + blockKeys.size());
1479         }
1480       }
1481     }
1482
1483     /**
1484      * The same as {@link #add(byte[], long, int, long)} but does not take the
1485      * key/value into account. Used for single-level indexes.
1486      *
1487      * @see {@link #add(byte[], long, int, long)}
1488      */
1489     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1490       add(firstKey, blockOffset, onDiskDataSize, -1);
1491     }
1492
1493     public void clear() {
1494       blockKeys.clear();
1495       blockOffsets.clear();
1496       onDiskDataSizes.clear();
1497       secondaryIndexOffsetMarks.clear();
1498       numSubEntriesAt.clear();
1499       curTotalNonRootEntrySize = 0;
1500       curTotalRootSize = 0;
1501     }
1502
1503     /**
1504      * Finds the entry corresponding to the deeper-level index block containing
1505      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1506      * ordering of sub-entries.
1507      *
1508      * <p>
1509      * <i> Implementation note. </i> We are looking for i such that
1510      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1511      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1512      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1513      * sub-entries. i is by definition the insertion point of k in
1514      * numSubEntriesAt.
1515      *
1516      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1517      * @return the 0-based index of the entry corresponding to the given
1518      *         sub-entry
1519      */
1520     public int getEntryBySubEntry(long k) {
1521       // We define mid-key as the key corresponding to k'th sub-entry
1522       // (0-based).
1523
1524       int i = Collections.binarySearch(numSubEntriesAt, k);
1525
1526       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1527       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1528       // is in the (i + 1)'th chunk.
1529       if (i >= 0)
1530         return i + 1;
1531
1532       // Inexact match. Return the insertion point.
1533       return -i - 1;
1534     }
1535
1536     /**
1537      * Used when writing the root block index of a multi-level block index.
1538      * Serializes additional information allowing to efficiently identify the
1539      * mid-key.
1540      *
1541      * @return a few serialized fields for finding the mid-key
1542      * @throws IOException if could not create metadata for computing mid-key
1543      */
1544     public byte[] getMidKeyMetadata() throws IOException {
1545       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1546           MID_KEY_METADATA_SIZE);
1547       DataOutputStream baosDos = new DataOutputStream(baos);
1548       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1549       if (totalNumSubEntries == 0) {
1550         throw new IOException("No leaf-level entries, mid-key unavailable");
1551       }
1552       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1553       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1554
1555       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1556       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1557
1558       long numSubEntriesBefore = midKeyEntry > 0
1559           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1560       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1561       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1562       {
1563         throw new IOException("Could not identify mid-key index within the "
1564             + "leaf-level block containing mid-key: out of range ("
1565             + subEntryWithinEntry + ", numSubEntriesBefore="
1566             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1567             + ")");
1568       }
1569
1570       baosDos.writeInt((int) subEntryWithinEntry);
1571
1572       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1573         throw new IOException("Could not write mid-key metadata: size=" +
1574             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1575       }
1576
1577       // Close just to be good citizens, although this has no effect.
1578       baos.close();
1579
1580       return baos.toByteArray();
1581     }
1582
1583     /**
1584      * Writes the block index chunk in the non-root index block format. This
1585      * format contains the number of entries, an index of integer offsets
1586      * for quick binary search on variable-length records, and tuples of
1587      * block offset, on-disk block size, and the first key for each entry.
1588      *
1589      * @param out
1590      * @throws IOException
1591      */
1592     void writeNonRoot(DataOutput out) throws IOException {
1593       // The number of entries in the block.
1594       out.writeInt(blockKeys.size());
1595
1596       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1597         throw new IOException("Corrupted block index chunk writer: " +
1598             blockKeys.size() + " entries but " +
1599             secondaryIndexOffsetMarks.size() + " secondary index items");
1600       }
1601
1602       // For each entry, write a "secondary index" of relative offsets to the
1603       // entries from the end of the secondary index. This works, because at
1604       // read time we read the number of entries and know where the secondary
1605       // index ends.
1606       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1607         out.writeInt(currentSecondaryIndex);
1608
1609       // We include one other element in the secondary index to calculate the
1610       // size of each entry more easily by subtracting secondary index elements.
1611       out.writeInt(curTotalNonRootEntrySize);
1612
1613       for (int i = 0; i < blockKeys.size(); ++i) {
1614         out.writeLong(blockOffsets.get(i));
1615         out.writeInt(onDiskDataSizes.get(i));
1616         out.write(blockKeys.get(i));
1617       }
1618     }
1619
1620     /**
1621      * @return the size of this chunk if stored in the non-root index block
1622      *         format
1623      */
1624     int getNonRootSize() {
1625       return Bytes.SIZEOF_INT                          // Number of entries
1626           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1627           + curTotalNonRootEntrySize;                  // All entries
1628     }
1629
1630     /**
1631      * Writes this chunk into the given output stream in the root block index
1632      * format. This format is similar to the {@link HFile} version 1 block
1633      * index format, except that we store on-disk size of the block instead of
1634      * its uncompressed size.
1635      *
1636      * @param out the data output stream to write the block index to. Typically
1637      *          a stream writing into an {@link HFile} block.
1638      * @throws IOException
1639      */
1640     void writeRoot(DataOutput out) throws IOException {
1641       for (int i = 0; i < blockKeys.size(); ++i) {
1642         out.writeLong(blockOffsets.get(i));
1643         out.writeInt(onDiskDataSizes.get(i));
1644         Bytes.writeByteArray(out, blockKeys.get(i));
1645       }
1646     }
1647
1648     /**
1649      * @return the size of this chunk if stored in the root index block format
1650      */
1651     int getRootSize() {
1652       return curTotalRootSize;
1653     }
1654
1655     /**
1656      * @return the number of entries in this block index chunk
1657      */
1658     public int getNumEntries() {
1659       return blockKeys.size();
1660     }
1661
1662     public byte[] getBlockKey(int i) {
1663       return blockKeys.get(i);
1664     }
1665
1666     public long getBlockOffset(int i) {
1667       return blockOffsets.get(i);
1668     }
1669
1670     public int getOnDiskDataSize(int i) {
1671       return onDiskDataSizes.get(i);
1672     }
1673
1674     public long getCumulativeNumKV(int i) {
1675       if (i < 0)
1676         return 0;
1677       return numSubEntriesAt.get(i);
1678     }
1679
1680   }
1681
1682   public static int getMaxChunkSize(Configuration conf) {
1683     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1684   }
1685
1686   public static int getMinIndexNumEntries(Configuration conf) {
1687     return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1688   }
1689 }