View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellComparator;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.io.HeapSize;
47  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
48  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
49  import org.apache.hadoop.hbase.nio.ByteBuff;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ClassSize;
52  import org.apache.hadoop.hbase.util.ObjectIntPair;
53  import org.apache.hadoop.io.WritableUtils;
54  import org.apache.hadoop.util.StringUtils;
55  
56  /**
57   * Provides functionality to write ({@link BlockIndexWriter}) and read
58   * ({@link BlockIndexReader})
59   * single-level and multi-level block indexes.
60   *
61   * Examples of how to use the block index writer can be found in
62   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
63   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
64   *  found in {@link HFileWriterImpl} and
65   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
66   */
67  @InterfaceAudience.Private
68  public class HFileBlockIndex {
69  
70    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
71  
72    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
73  
74    /**
75     * The maximum size guideline for index blocks (both leaf, intermediate, and
76     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
77     */
78    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
79  
80    /**
81     * The number of bytes stored in each "secondary index" entry in addition to
82     * key bytes in the non-root index block format. The first long is the file
83     * offset of the deeper-level block the entry points to, and the int that
84     * follows is that block's on-disk size without including header.
85     */
86    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
87        + Bytes.SIZEOF_LONG;
88  
89    /**
90     * Error message when trying to use inline block API in single-level mode.
91     */
92    private static final String INLINE_BLOCKS_NOT_ALLOWED =
93        "Inline blocks are not allowed in the single-level-only mode";
94  
95    /**
96     * The size of a meta-data record used for finding the mid-key in a
97     * multi-level index. Consists of the middle leaf-level index block offset
98     * (long), its on-disk size without header included (int), and the mid-key
99     * entry's zero-based index in that leaf index block.
100    */
101   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
102       2 * Bytes.SIZEOF_INT;
103 
104   /**
105    * An implementation of the BlockIndexReader that deals with block keys which are plain
106    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
107    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
108    */
109    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
110 
111     private byte[][] blockKeys;
112 
113     public ByteArrayKeyBlockIndexReader(final int treeLevel,
114         final CachingBlockReader cachingBlockReader) {
115       this(treeLevel);
116       this.cachingBlockReader = cachingBlockReader;
117     }
118 
119     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
120       // Can be null for METAINDEX block
121       searchTreeLevel = treeLevel;
122     }
123 
124     protected long calculateHeapSizeForBlockKeys(long heapSize) {
125       // Calculating the size of blockKeys
126       if (blockKeys != null) {
127         heapSize += ClassSize.REFERENCE;
128         // Adding array + references overhead
129         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
130 
131         // Adding bytes
132         for (byte[] key : blockKeys) {
133           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
134         }
135       }
136       return heapSize;
137     }
138 
139     @Override
140     public boolean isEmpty() {
141       return blockKeys.length == 0;
142     }
143 
144     /**
145      * @param i
146      *          from 0 to {@link #getRootBlockCount() - 1}
147      */
148     public byte[] getRootBlockKey(int i) {
149       return blockKeys[i];
150     }
151 
152     @Override
153     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
154         boolean cacheBlocks, boolean pread, boolean isCompaction,
155         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
156       // this would not be needed
157       return null;
158     }
159 
160     @Override
161     public Cell midkey() throws IOException {
162       // Not needed here
163       return null;
164     }
165 
166     protected void initialize(int numEntries) {
167       blockKeys = new byte[numEntries][];
168     }
169 
170     protected void add(final byte[] key, final long offset, final int dataSize) {
171       blockOffsets[rootCount] = offset;
172       blockKeys[rootCount] = key;
173       blockDataSizes[rootCount] = dataSize;
174       rootCount++;
175     }
176 
177     @Override
178     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
179       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
180       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
181       // binarySearch's javadoc.
182 
183       if (pos >= 0) {
184         // This means this is an exact match with an element of blockKeys.
185         assert pos < blockKeys.length;
186         return pos;
187       }
188 
189       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
190       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
191       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
192       // key < blockKeys[0], meaning the file does not contain the given key.
193 
194       int i = -pos - 1;
195       assert 0 <= i && i <= blockKeys.length;
196       return i - 1;
197     }
198 
199     @Override
200     public int rootBlockContainingKey(Cell key) {
201       // Should not be called on this because here it deals only with byte[]
202       throw new UnsupportedOperationException(
203           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
204           "can be searched for");
205     }
206 
207     @Override
208     public String toString() {
209       StringBuilder sb = new StringBuilder();
210       sb.append("size=" + rootCount).append("\n");
211       for (int i = 0; i < rootCount; i++) {
212         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
213             .append("\n  offset=").append(blockOffsets[i])
214             .append(", dataSize=" + blockDataSizes[i]).append("\n");
215       }
216       return sb.toString();
217     }
218 
219   }
220 
221   /**
222    * An implementation of the BlockIndexReader that deals with block keys which are the key
223    * part of a cell like the Data block index or the ROW_COL bloom blocks
224    * This needs a comparator to work with the Cells
225    */
226    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
227 
228     private Cell[] blockKeys;
229     /** Pre-computed mid-key */
230     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
231     /** Needed doing lookup on blocks. */
232     private CellComparator comparator;
233 
234     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
235         final CachingBlockReader cachingBlockReader) {
236       this(c, treeLevel);
237       this.cachingBlockReader = cachingBlockReader;
238     }
239 
240     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
241       // Can be null for METAINDEX block
242       comparator = c;
243       searchTreeLevel = treeLevel;
244     }
245 
246     protected long calculateHeapSizeForBlockKeys(long heapSize) {
247       if (blockKeys != null) {
248         heapSize += ClassSize.REFERENCE;
249         // Adding array + references overhead
250         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
251 
252         // Adding blockKeys
253         for (Cell key : blockKeys) {
254           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
255         }
256       }
257       // Add comparator and the midkey atomicreference
258       heapSize += 2 * ClassSize.REFERENCE;
259       return heapSize;
260     }
261 
262     @Override
263     public boolean isEmpty() {
264       return blockKeys.length == 0;
265     }
266 
267     /**
268      * @param i
269      *          from 0 to {@link #getRootBlockCount() - 1}
270      */
271     public Cell getRootBlockKey(int i) {
272       return blockKeys[i];
273     }
274     @Override
275     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
276         boolean cacheBlocks, boolean pread, boolean isCompaction,
277         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
278       int rootLevelIndex = rootBlockContainingKey(key);
279       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
280         return null;
281       }
282 
283       // the next indexed key
284       Cell nextIndexedKey = null;
285 
286       // Read the next-level (intermediate or leaf) index block.
287       long currentOffset = blockOffsets[rootLevelIndex];
288       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
289 
290       if (rootLevelIndex < blockKeys.length - 1) {
291         nextIndexedKey = blockKeys[rootLevelIndex + 1];
292       } else {
293         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
294       }
295 
296       int lookupLevel = 1; // How many levels deep we are in our lookup.
297       int index = -1;
298 
299       HFileBlock block = null;
300       boolean dataBlock = false;
301       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
302       while (true) {
303         try {
304           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
305             // Avoid reading the same block again, even with caching turned off.
306             // This is crucial for compaction-type workload which might have
307             // caching turned off. This is like a one-block cache inside the
308             // scanner.
309             block = currentBlock;
310           } else {
311             // Call HFile's caching block reader API. We always cache index
312             // blocks, otherwise we might get terrible performance.
313             boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
314             BlockType expectedBlockType;
315             if (lookupLevel < searchTreeLevel - 1) {
316               expectedBlockType = BlockType.INTERMEDIATE_INDEX;
317             } else if (lookupLevel == searchTreeLevel - 1) {
318               expectedBlockType = BlockType.LEAF_INDEX;
319             } else {
320               // this also accounts for ENCODED_DATA
321               expectedBlockType = BlockType.DATA;
322             }
323             block =
324                 cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
325                   isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
326           }
327 
328           if (block == null) {
329             throw new IOException("Failed to read block at offset " + currentOffset
330                 + ", onDiskSize=" + currentOnDiskSize);
331           }
332 
333           // Found a data block, break the loop and check our level in the tree.
334           if (block.getBlockType().isData()) {
335             dataBlock = true;
336             break;
337           }
338 
339           // Not a data block. This must be a leaf-level or intermediate-level
340           // index block. We don't allow going deeper than searchTreeLevel.
341           if (++lookupLevel > searchTreeLevel) {
342             throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
343                 + ", searchTreeLevel=" + searchTreeLevel);
344           }
345 
346           // Locate the entry corresponding to the given key in the non-root
347           // (leaf or intermediate-level) index block.
348           ByteBuff buffer = block.getBufferWithoutHeader();
349           index = locateNonRootIndexEntry(buffer, key, comparator);
350           if (index == -1) {
351             // This has to be changed
352             // For now change this to key value
353             KeyValue kv = KeyValueUtil.ensureKeyValue(key);
354             throw new IOException("The key "
355                 + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
356                 + " is before the" + " first key of the non-root index block " + block);
357           }
358 
359           currentOffset = buffer.getLong();
360           currentOnDiskSize = buffer.getInt();
361 
362           // Only update next indexed key if there is a next indexed key in the current level
363           byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
364           if (nonRootIndexedKey != null) {
365             tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
366             nextIndexedKey = tmpNextIndexKV;
367           }
368         } finally {
369           if (!dataBlock) {
370             // Return the block immediately if it is not the
371             // data block
372             cachingBlockReader.returnBlock(block);
373           }
374         }
375       }
376 
377       if (lookupLevel != searchTreeLevel) {
378         assert dataBlock == true;
379         // Though we have retrieved a data block we have found an issue
380         // in the retrieved data block. Hence returned the block so that
381         // the ref count can be decremented
382         cachingBlockReader.returnBlock(block);
383         throw new IOException("Reached a data block at level " + lookupLevel +
384             " but the number of levels is " + searchTreeLevel);
385       }
386 
387       // set the next indexed key for the current block.
388       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
389       return blockWithScanInfo;
390     }
391 
392     @Override
393     public Cell midkey() throws IOException {
394       if (rootCount == 0)
395         throw new IOException("HFile empty");
396 
397       Cell targetMidKey = this.midKey.get();
398       if (targetMidKey != null) {
399         return targetMidKey;
400       }
401 
402       if (midLeafBlockOffset >= 0) {
403         if (cachingBlockReader == null) {
404           throw new IOException("Have to read the middle leaf block but " +
405               "no block reader available");
406         }
407 
408         // Caching, using pread, assuming this is not a compaction.
409         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
410             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
411             BlockType.LEAF_INDEX, null);
412         try {
413           ByteBuff b = midLeafBlock.getBufferWithoutHeader();
414           int numDataBlocks = b.getIntAfterPosition(0);
415           int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
416           int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
417           int keyOffset =
418               Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
419                   + SECONDARY_INDEX_ENTRY_OVERHEAD;
420           byte[] bytes = b.toBytes(keyOffset, keyLen);
421           targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
422         } finally {
423           cachingBlockReader.returnBlock(midLeafBlock);
424         }
425       } else {
426         // The middle of the root-level index.
427         targetMidKey = blockKeys[rootCount / 2];
428       }
429 
430       this.midKey.set(targetMidKey);
431       return targetMidKey;
432     }
433 
434     protected void initialize(int numEntries) {
435       blockKeys = new Cell[numEntries];
436     }
437 
438     /**
439      * Adds a new entry in the root block index. Only used when reading.
440      *
441      * @param key Last key in the block
442      * @param offset file offset where the block is stored
443      * @param dataSize the uncompressed data size
444      */
445     protected void add(final byte[] key, final long offset, final int dataSize) {
446       blockOffsets[rootCount] = offset;
447       // Create the blockKeys as Cells once when the reader is opened
448       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
449       blockDataSizes[rootCount] = dataSize;
450       rootCount++;
451     }
452 
453     @Override
454     public int rootBlockContainingKey(final byte[] key, int offset, int length,
455         CellComparator comp) {
456       // This should always be called with Cell not with a byte[] key
457       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
458       		"array. Only cell based keys can be searched for");
459     }
460 
461     @Override
462     public int rootBlockContainingKey(Cell key) {
463       // Here the comparator should not be null as this happens for the root-level block
464       int pos = Bytes.binarySearch(blockKeys, key, comparator);
465       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
466       // binarySearch's javadoc.
467 
468       if (pos >= 0) {
469         // This means this is an exact match with an element of blockKeys.
470         assert pos < blockKeys.length;
471         return pos;
472       }
473 
474       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
475       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
476       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
477       // key < blockKeys[0], meaning the file does not contain the given key.
478 
479       int i = -pos - 1;
480       assert 0 <= i && i <= blockKeys.length;
481       return i - 1;
482     }
483 
484     @Override
485     public String toString() {
486       StringBuilder sb = new StringBuilder();
487       sb.append("size=" + rootCount).append("\n");
488       for (int i = 0; i < rootCount; i++) {
489         sb.append("key=").append((blockKeys[i]))
490             .append("\n  offset=").append(blockOffsets[i])
491             .append(", dataSize=" + blockDataSizes[i]).append("\n");
492       }
493       return sb.toString();
494     }
495   }
496    /**
497    * The reader will always hold the root level index in the memory. Index
498    * blocks at all other levels will be cached in the LRU cache in practice,
499    * although this API does not enforce that.
500    *
501    * All non-root (leaf and intermediate) index blocks contain what we call a
502    * "secondary index": an array of offsets to the entries within the block.
503    * This allows us to do binary search for the entry corresponding to the
504    * given key without having to deserialize the block.
505    */
506    static abstract class BlockIndexReader implements HeapSize {
507 
508     protected long[] blockOffsets;
509     protected int[] blockDataSizes;
510     protected int rootCount = 0;
511 
512     // Mid-key metadata.
513     protected long midLeafBlockOffset = -1;
514     protected int midLeafBlockOnDiskSize = -1;
515     protected int midKeyEntry = -1;
516 
517     /**
518      * The number of levels in the block index tree. One if there is only root
519      * level, two for root and leaf levels, etc.
520      */
521     protected int searchTreeLevel;
522 
523     /** A way to read {@link HFile} blocks at a given offset */
524     protected CachingBlockReader cachingBlockReader;
525 
526     /**
527      * @return true if the block index is empty.
528      */
529     public abstract boolean isEmpty();
530 
531     /**
532      * Verifies that the block index is non-empty and throws an
533      * {@link IllegalStateException} otherwise.
534      */
535     public void ensureNonEmpty() {
536       if (isEmpty()) {
537         throw new IllegalStateException("Block index is empty or not loaded");
538       }
539     }
540 
541     /**
542      * Return the data block which contains this key. This function will only
543      * be called when the HFile version is larger than 1.
544      *
545      * @param key the key we are looking for
546      * @param currentBlock the current block, to avoid re-reading the same block
547      * @param cacheBlocks
548      * @param pread
549      * @param isCompaction
550      * @param expectedDataBlockEncoding the data block encoding the caller is
551      *          expecting the data block to be in, or null to not perform this
552      *          check and return the block irrespective of the encoding
553      * @return reader a basic way to load blocks
554      * @throws IOException
555      */
556     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
557         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
558         throws IOException {
559       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
560           cacheBlocks,
561           pread, isCompaction, expectedDataBlockEncoding);
562       if (blockWithScanInfo == null) {
563         return null;
564       } else {
565         return blockWithScanInfo.getHFileBlock();
566       }
567     }
568 
569     /**
570      * Return the BlockWithScanInfo which contains the DataBlock with other scan
571      * info such as nextIndexedKey. This function will only be called when the
572      * HFile version is larger than 1.
573      * 
574      * @param key
575      *          the key we are looking for
576      * @param currentBlock
577      *          the current block, to avoid re-reading the same block
578      * @param cacheBlocks
579      * @param pread
580      * @param isCompaction
581      * @param expectedDataBlockEncoding the data block encoding the caller is
582      *          expecting the data block to be in, or null to not perform this
583      *          check and return the block irrespective of the encoding.
584      * @return the BlockWithScanInfo which contains the DataBlock with other
585      *         scan info such as nextIndexedKey.
586      * @throws IOException
587      */
588     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
589         boolean cacheBlocks,
590         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
591         throws IOException;
592 
593     /**
594      * An approximation to the {@link HFile}'s mid-key. Operates on block
595      * boundaries, and does not go inside blocks. In other words, returns the
596      * first key of the middle block of the file.
597      *
598      * @return the first key of the middle block
599      */
600     public abstract Cell midkey() throws IOException;
601 
602     /**
603      * @param i from 0 to {@link #getRootBlockCount() - 1}
604      */
605     public long getRootBlockOffset(int i) {
606       return blockOffsets[i];
607     }
608 
609     /**
610      * @param i zero-based index of a root-level block
611      * @return the on-disk size of the root-level block for version 2, or the
612      *         uncompressed size for version 1
613      */
614     public int getRootBlockDataSize(int i) {
615       return blockDataSizes[i];
616     }
617 
618     /**
619      * @return the number of root-level blocks in this block index
620      */
621     public int getRootBlockCount() {
622       return rootCount;
623     }
624 
625     /**
626      * Finds the root-level index block containing the given key.
627      * 
628      * @param key
629      *          Key to find
630      * @param comp
631      *          the comparator to be used
632      * @return Offset of block containing <code>key</code> (between 0 and the
633      *         number of blocks - 1) or -1 if this file does not contain the
634      *         request.
635      */
636     // When we want to find the meta index block or bloom block for ROW bloom
637     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
638     // CellComparator.
639     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
640         CellComparator comp);
641 
642     /**
643      * Finds the root-level index block containing the given key.
644      *
645      * @param key
646      *          Key to find
647      * @return Offset of block containing <code>key</code> (between 0 and the
648      *         number of blocks - 1) or -1 if this file does not contain the
649      *         request.
650      */
651     // When we want to find the meta index block or bloom block for ROW bloom
652     // type
653     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
654     // need the CellComparator.
655     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
656       return rootBlockContainingKey(key, offset, length, null);
657     }
658 
659     /**
660      * Finds the root-level index block containing the given key.
661      *
662      * @param key
663      *          Key to find
664      */
665     public abstract int rootBlockContainingKey(final Cell key);
666 
667     /**
668      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
669      * @param nonRootIndex
670      * @param i the ith position
671      * @return The indexed key at the ith position in the nonRootIndex.
672      */
673     protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
674       int numEntries = nonRootIndex.getInt(0);
675       if (i < 0 || i >= numEntries) {
676         return null;
677       }
678 
679       // Entries start after the number of entries and the secondary index.
680       // The secondary index takes numEntries + 1 ints.
681       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
682       // Targetkey's offset relative to the end of secondary index
683       int targetKeyRelOffset = nonRootIndex.getInt(
684           Bytes.SIZEOF_INT * (i + 1));
685 
686       // The offset of the target key in the blockIndex buffer
687       int targetKeyOffset = entriesOffset     // Skip secondary index
688           + targetKeyRelOffset               // Skip all entries until mid
689           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
690 
691       // We subtract the two consecutive secondary index elements, which
692       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
693       // then need to subtract the overhead of offset and onDiskSize.
694       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
695         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
696 
697       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
698       return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
699     }
700 
701     /**
702      * Performs a binary search over a non-root level index block. Utilizes the
703      * secondary index, which records the offsets of (offset, onDiskSize,
704      * firstKey) tuples of all entries.
705      * 
706      * @param key
707      *          the key we are searching for offsets to individual entries in
708      *          the blockIndex buffer
709      * @param nonRootIndex
710      *          the non-root index block buffer, starting with the secondary
711      *          index. The position is ignored.
712      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
713      *         keys[i + 1], if keys is the array of all keys being searched, or
714      *         -1 otherwise
715      * @throws IOException
716      */
717     static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
718         CellComparator comparator) {
719 
720       int numEntries = nonRootIndex.getIntAfterPosition(0);
721       int low = 0;
722       int high = numEntries - 1;
723       int mid = 0;
724 
725       // Entries start after the number of entries and the secondary index.
726       // The secondary index takes numEntries + 1 ints.
727       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
728 
729       // If we imagine that keys[-1] = -Infinity and
730       // keys[numEntries] = Infinity, then we are maintaining an invariant that
731       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
732       ByteBufferedKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferedKeyOnlyKeyValue();
733       ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<ByteBuffer>();
734       while (low <= high) {
735         mid = (low + high) >>> 1;
736 
737         // Midkey's offset relative to the end of secondary index
738         int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
739 
740         // The offset of the middle key in the blockIndex buffer
741         int midKeyOffset = entriesOffset       // Skip secondary index
742             + midKeyRelOffset                  // Skip all entries until mid
743             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
744 
745         // We subtract the two consecutive secondary index elements, which
746         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
747         // then need to subtract the overhead of offset and onDiskSize.
748         int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
749             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
750 
751         // we have to compare in this order, because the comparator order
752         // has special logic when the 'left side' is a special key.
753         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
754         // done after HBASE-12224 & HBASE-12282
755         // TODO avoid array call.
756         nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
757         nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
758         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexkeyOnlyKV);
759 
760         // key lives above the midpoint
761         if (cmp > 0)
762           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
763         // key lives below the midpoint
764         else if (cmp < 0)
765           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
766         else
767           return mid; // exact match
768       }
769 
770       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
771       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
772       // condition, low >= high + 1. Therefore, low = high + 1.
773 
774       if (low != high + 1) {
775         throw new IllegalStateException("Binary search broken: low=" + low
776             + " " + "instead of " + (high + 1));
777       }
778 
779       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
780       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
781       int i = low - 1;
782 
783       // Some extra validation on the result.
784       if (i < -1 || i >= numEntries) {
785         throw new IllegalStateException("Binary search broken: result is " +
786             i + " but expected to be between -1 and (numEntries - 1) = " +
787             (numEntries - 1));
788       }
789 
790       return i;
791     }
792 
793     /**
794      * Search for one key using the secondary index in a non-root block. In case
795      * of success, positions the provided buffer at the entry of interest, where
796      * the file offset and the on-disk-size can be read.
797      *
798      * @param nonRootBlock
799      *          a non-root block without header. Initial position does not
800      *          matter.
801      * @param key
802      *          the byte array containing the key
803      * @return the index position where the given key was found, otherwise
804      *         return -1 in the case the given key is before the first key.
805      *
806      */
807     static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
808         CellComparator comparator) {
809       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
810 
811       if (entryIndex != -1) {
812         int numEntries = nonRootBlock.getIntAfterPosition(0);
813 
814         // The end of secondary index and the beginning of entries themselves.
815         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
816 
817         // The offset of the entry we are interested in relative to the end of
818         // the secondary index.
819         int entryRelOffset = nonRootBlock
820             .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
821 
822         nonRootBlock.position(entriesOffset + entryRelOffset);
823       }
824 
825       return entryIndex;
826     }
827 
828     /**
829      * Read in the root-level index from the given input stream. Must match
830      * what was written into the root level by
831      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
832      * offset that function returned.
833      *
834      * @param in the buffered input stream or wrapped byte input stream
835      * @param numEntries the number of root-level index entries
836      * @throws IOException
837      */
838     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
839       blockOffsets = new long[numEntries];
840       initialize(numEntries);
841       blockDataSizes = new int[numEntries];
842 
843       // If index size is zero, no index was written.
844       if (numEntries > 0) {
845         for (int i = 0; i < numEntries; ++i) {
846           long offset = in.readLong();
847           int dataSize = in.readInt();
848           byte[] key = Bytes.readByteArray(in);
849           add(key, offset, dataSize);
850         }
851       }
852     }
853 
854     protected abstract void initialize(int numEntries);
855 
856     protected abstract void add(final byte[] key, final long offset, final int dataSize);
857 
858     /**
859      * Read in the root-level index from the given input stream. Must match
860      * what was written into the root level by
861      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
862      * offset that function returned.
863      *
864      * @param blk the HFile block
865      * @param numEntries the number of root-level index entries
866      * @return the buffered input stream or wrapped byte input stream
867      * @throws IOException
868      */
869     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
870       DataInputStream in = blk.getByteStream();
871       readRootIndex(in, numEntries);
872       return in;
873     }
874 
875     /**
876      * Read the root-level metadata of a multi-level block index. Based on
877      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
878      * necessary to compute the mid-key in a multi-level index.
879      *
880      * @param blk the HFile block
881      * @param numEntries the number of root-level index entries
882      * @throws IOException
883      */
884     public void readMultiLevelIndexRoot(HFileBlock blk,
885         final int numEntries) throws IOException {
886       DataInputStream in = readRootIndex(blk, numEntries);
887       // after reading the root index the checksum bytes have to
888       // be subtracted to know if the mid key exists.
889       int checkSumBytes = blk.totalChecksumBytes();
890       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
891         // No mid-key metadata available.
892         return;
893       }
894       midLeafBlockOffset = in.readLong();
895       midLeafBlockOnDiskSize = in.readInt();
896       midKeyEntry = in.readInt();
897     }
898 
899     @Override
900     public long heapSize() {
901       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
902       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
903           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
904 
905       // Mid-key metadata.
906       heapSize += MID_KEY_METADATA_SIZE;
907 
908       heapSize = calculateHeapSizeForBlockKeys(heapSize);
909 
910       if (blockOffsets != null) {
911         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
912             * Bytes.SIZEOF_LONG);
913       }
914 
915       if (blockDataSizes != null) {
916         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
917             * Bytes.SIZEOF_INT);
918       }
919 
920       return ClassSize.align(heapSize);
921     }
922 
923     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
924   }
925 
926   /**
927    * Writes the block index into the output stream. Generate the tree from
928    * bottom up. The leaf level is written to disk as a sequence of inline
929    * blocks, if it is larger than a certain number of bytes. If the leaf level
930    * is not large enough, we write all entries to the root level instead.
931    *
932    * After all leaf blocks have been written, we end up with an index
933    * referencing the resulting leaf index blocks. If that index is larger than
934    * the allowed root index size, the writer will break it up into
935    * reasonable-size intermediate-level index block chunks write those chunks
936    * out, and create another index referencing those chunks. This will be
937    * repeated until the remaining index is small enough to become the root
938    * index. However, in most practical cases we will only have leaf-level
939    * blocks and the root index, or just the root index.
940    */
941   public static class BlockIndexWriter implements InlineBlockWriter {
942     /**
943      * While the index is being written, this represents the current block
944      * index referencing all leaf blocks, with one exception. If the file is
945      * being closed and there are not enough blocks to complete even a single
946      * leaf block, no leaf blocks get written and this contains the entire
947      * block index. After all levels of the index were written by
948      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
949      * root-level index.
950      */
951     private BlockIndexChunk rootChunk = new BlockIndexChunk();
952 
953     /**
954      * Current leaf-level chunk. New entries referencing data blocks get added
955      * to this chunk until it grows large enough to be written to disk.
956      */
957     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
958 
959     /**
960      * The number of block index levels. This is one if there is only root
961      * level (even empty), two if there a leaf level and root level, and is
962      * higher if there are intermediate levels. This is only final after
963      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
964      * initial value accounts for the root level, and will be increased to two
965      * as soon as we find out there is a leaf-level in
966      * {@link #blockWritten(long, int, int)}.
967      */
968     private int numLevels = 1;
969 
970     private HFileBlock.Writer blockWriter;
971     private byte[] firstKey = null;
972 
973     /**
974      * The total number of leaf-level entries, i.e. entries referenced by
975      * leaf-level blocks. For the data block index this is equal to the number
976      * of data blocks.
977      */
978     private long totalNumEntries;
979 
980     /** Total compressed size of all index blocks. */
981     private long totalBlockOnDiskSize;
982 
983     /** Total uncompressed size of all index blocks. */
984     private long totalBlockUncompressedSize;
985 
986     /** The maximum size guideline of all multi-level index blocks. */
987     private int maxChunkSize;
988 
989     /** Whether we require this block index to always be single-level. */
990     private boolean singleLevelOnly;
991 
992     /** CacheConfig, or null if cache-on-write is disabled */
993     private CacheConfig cacheConf;
994 
995     /** Name to use for computing cache keys */
996     private String nameForCaching;
997 
998     /** Creates a single-level block index writer */
999     public BlockIndexWriter() {
1000       this(null, null, null);
1001       singleLevelOnly = true;
1002     }
1003 
1004     /**
1005      * Creates a multi-level block index writer.
1006      *
1007      * @param blockWriter the block writer to use to write index blocks
1008      * @param cacheConf used to determine when and how a block should be cached-on-write.
1009      */
1010     public BlockIndexWriter(HFileBlock.Writer blockWriter,
1011         CacheConfig cacheConf, String nameForCaching) {
1012       if ((cacheConf == null) != (nameForCaching == null)) {
1013         throw new IllegalArgumentException("Block cache and file name for " +
1014             "caching must be both specified or both null");
1015       }
1016 
1017       this.blockWriter = blockWriter;
1018       this.cacheConf = cacheConf;
1019       this.nameForCaching = nameForCaching;
1020       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1021     }
1022 
1023     public void setMaxChunkSize(int maxChunkSize) {
1024       if (maxChunkSize <= 0) {
1025         throw new IllegalArgumentException("Invald maximum index block size");
1026       }
1027       this.maxChunkSize = maxChunkSize;
1028     }
1029 
1030     /**
1031      * Writes the root level and intermediate levels of the block index into
1032      * the output stream, generating the tree from bottom up. Assumes that the
1033      * leaf level has been inline-written to the disk if there is enough data
1034      * for more than one leaf block. We iterate by breaking the current level
1035      * of the block index, starting with the index of all leaf-level blocks,
1036      * into chunks small enough to be written to disk, and generate its parent
1037      * level, until we end up with a level small enough to become the root
1038      * level.
1039      *
1040      * If the leaf level is not large enough, there is no inline block index
1041      * anymore, so we only write that level of block index to disk as the root
1042      * level.
1043      *
1044      * @param out FSDataOutputStream
1045      * @return position at which we entered the root-level index.
1046      * @throws IOException
1047      */
1048     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1049       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1050         throw new IOException("Trying to write a multi-level block index, " +
1051             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1052             "last inline chunk.");
1053       }
1054 
1055       // We need to get mid-key metadata before we create intermediate
1056       // indexes and overwrite the root chunk.
1057       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1058           : null;
1059 
1060       if (curInlineChunk != null) {
1061         while (rootChunk.getRootSize() > maxChunkSize) {
1062           rootChunk = writeIntermediateLevel(out, rootChunk);
1063           numLevels += 1;
1064         }
1065       }
1066 
1067       // write the root level
1068       long rootLevelIndexPos = out.getPos();
1069 
1070       {
1071         DataOutput blockStream =
1072             blockWriter.startWriting(BlockType.ROOT_INDEX);
1073         rootChunk.writeRoot(blockStream);
1074         if (midKeyMetadata != null)
1075           blockStream.write(midKeyMetadata);
1076         blockWriter.writeHeaderAndData(out);
1077       }
1078 
1079       // Add root index block size
1080       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1081       totalBlockUncompressedSize +=
1082           blockWriter.getUncompressedSizeWithoutHeader();
1083 
1084       if (LOG.isTraceEnabled()) {
1085         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1086           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1087           + " root-level entries, " + totalNumEntries + " total entries, "
1088           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1089           " on-disk size, "
1090           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1091           " total uncompressed size.");
1092       }
1093       return rootLevelIndexPos;
1094     }
1095 
1096     /**
1097      * Writes the block index data as a single level only. Does not do any
1098      * block framing.
1099      *
1100      * @param out the buffered output stream to write the index to. Typically a
1101      *          stream writing into an {@link HFile} block.
1102      * @param description a short description of the index being written. Used
1103      *          in a log message.
1104      * @throws IOException
1105      */
1106     public void writeSingleLevelIndex(DataOutput out, String description)
1107         throws IOException {
1108       expectNumLevels(1);
1109 
1110       if (!singleLevelOnly)
1111         throw new IOException("Single-level mode is turned off");
1112 
1113       if (rootChunk.getNumEntries() > 0)
1114         throw new IOException("Root-level entries already added in " +
1115             "single-level mode");
1116 
1117       rootChunk = curInlineChunk;
1118       curInlineChunk = new BlockIndexChunk();
1119 
1120       if (LOG.isTraceEnabled()) {
1121         LOG.trace("Wrote a single-level " + description + " index with "
1122           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1123           + " bytes");
1124       }
1125       rootChunk.writeRoot(out);
1126     }
1127 
1128     /**
1129      * Split the current level of the block index into intermediate index
1130      * blocks of permitted size and write those blocks to disk. Return the next
1131      * level of the block index referencing those intermediate-level blocks.
1132      *
1133      * @param out
1134      * @param currentLevel the current level of the block index, such as the a
1135      *          chunk referencing all leaf-level index blocks
1136      * @return the parent level block index, which becomes the root index after
1137      *         a few (usually zero) iterations
1138      * @throws IOException
1139      */
1140     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1141         BlockIndexChunk currentLevel) throws IOException {
1142       // Entries referencing intermediate-level blocks we are about to create.
1143       BlockIndexChunk parent = new BlockIndexChunk();
1144 
1145       // The current intermediate-level block index chunk.
1146       BlockIndexChunk curChunk = new BlockIndexChunk();
1147 
1148       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1149         curChunk.add(currentLevel.getBlockKey(i),
1150             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1151 
1152         if (curChunk.getRootSize() >= maxChunkSize)
1153           writeIntermediateBlock(out, parent, curChunk);
1154       }
1155 
1156       if (curChunk.getNumEntries() > 0) {
1157         writeIntermediateBlock(out, parent, curChunk);
1158       }
1159 
1160       return parent;
1161     }
1162 
1163     private void writeIntermediateBlock(FSDataOutputStream out,
1164         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1165       long beginOffset = out.getPos();
1166       DataOutputStream dos = blockWriter.startWriting(
1167           BlockType.INTERMEDIATE_INDEX);
1168       curChunk.writeNonRoot(dos);
1169       byte[] curFirstKey = curChunk.getBlockKey(0);
1170       blockWriter.writeHeaderAndData(out);
1171 
1172       if (cacheConf != null) {
1173         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1174         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1175           beginOffset), blockForCaching);
1176       }
1177 
1178       // Add intermediate index block size
1179       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1180       totalBlockUncompressedSize +=
1181           blockWriter.getUncompressedSizeWithoutHeader();
1182 
1183       // OFFSET is the beginning offset the chunk of block index entries.
1184       // SIZE is the total byte size of the chunk of block index entries
1185       // + the secondary index size
1186       // FIRST_KEY is the first key in the chunk of block index
1187       // entries.
1188       parent.add(curFirstKey, beginOffset,
1189           blockWriter.getOnDiskSizeWithHeader());
1190 
1191       // clear current block index chunk
1192       curChunk.clear();
1193       curFirstKey = null;
1194     }
1195 
1196     /**
1197      * @return how many block index entries there are in the root level
1198      */
1199     public final int getNumRootEntries() {
1200       return rootChunk.getNumEntries();
1201     }
1202 
1203     /**
1204      * @return the number of levels in this block index.
1205      */
1206     public int getNumLevels() {
1207       return numLevels;
1208     }
1209 
1210     private void expectNumLevels(int expectedNumLevels) {
1211       if (numLevels != expectedNumLevels) {
1212         throw new IllegalStateException("Number of block index levels is "
1213             + numLevels + "but is expected to be " + expectedNumLevels);
1214       }
1215     }
1216 
1217     /**
1218      * Whether there is an inline block ready to be written. In general, we
1219      * write an leaf-level index block as an inline block as soon as its size
1220      * as serialized in the non-root format reaches a certain threshold.
1221      */
1222     @Override
1223     public boolean shouldWriteBlock(boolean closing) {
1224       if (singleLevelOnly) {
1225         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1226       }
1227 
1228       if (curInlineChunk == null) {
1229         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1230             "called with closing=true and then called again?");
1231       }
1232 
1233       if (curInlineChunk.getNumEntries() == 0) {
1234         return false;
1235       }
1236 
1237       // We do have some entries in the current inline chunk.
1238       if (closing) {
1239         if (rootChunk.getNumEntries() == 0) {
1240           // We did not add any leaf-level blocks yet. Instead of creating a
1241           // leaf level with one block, move these entries to the root level.
1242 
1243           expectNumLevels(1);
1244           rootChunk = curInlineChunk;
1245           curInlineChunk = null;  // Disallow adding any more index entries.
1246           return false;
1247         }
1248 
1249         return true;
1250       } else {
1251         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1252       }
1253     }
1254 
1255     /**
1256      * Write out the current inline index block. Inline blocks are non-root
1257      * blocks, so the non-root index format is used.
1258      *
1259      * @param out
1260      */
1261     @Override
1262     public void writeInlineBlock(DataOutput out) throws IOException {
1263       if (singleLevelOnly)
1264         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1265 
1266       // Write the inline block index to the output stream in the non-root
1267       // index block format.
1268       curInlineChunk.writeNonRoot(out);
1269 
1270       // Save the first key of the inline block so that we can add it to the
1271       // parent-level index.
1272       firstKey = curInlineChunk.getBlockKey(0);
1273 
1274       // Start a new inline index block
1275       curInlineChunk.clear();
1276     }
1277 
1278     /**
1279      * Called after an inline block has been written so that we can add an
1280      * entry referring to that block to the parent-level index.
1281      */
1282     @Override
1283     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1284       // Add leaf index block size
1285       totalBlockOnDiskSize += onDiskSize;
1286       totalBlockUncompressedSize += uncompressedSize;
1287 
1288       if (singleLevelOnly)
1289         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1290 
1291       if (firstKey == null) {
1292         throw new IllegalStateException("Trying to add second-level index " +
1293             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1294             "but the first key was not set in writeInlineBlock");
1295       }
1296 
1297       if (rootChunk.getNumEntries() == 0) {
1298         // We are writing the first leaf block, so increase index level.
1299         expectNumLevels(1);
1300         numLevels = 2;
1301       }
1302 
1303       // Add another entry to the second-level index. Include the number of
1304       // entries in all previous leaf-level chunks for mid-key calculation.
1305       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1306       firstKey = null;
1307     }
1308 
1309     @Override
1310     public BlockType getInlineBlockType() {
1311       return BlockType.LEAF_INDEX;
1312     }
1313 
1314     /**
1315      * Add one index entry to the current leaf-level block. When the leaf-level
1316      * block gets large enough, it will be flushed to disk as an inline block.
1317      *
1318      * @param firstKey the first key of the data block
1319      * @param blockOffset the offset of the data block
1320      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1321      *          format version 2), or the uncompressed size of the data block (
1322      *          {@link HFile} format version 1).
1323      */
1324     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1325       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1326       ++totalNumEntries;
1327     }
1328 
1329     /**
1330      * @throws IOException if we happened to write a multi-level index.
1331      */
1332     public void ensureSingleLevel() throws IOException {
1333       if (numLevels > 1) {
1334         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1335             rootChunk.getNumEntries() + " root-level entries, but " +
1336             "this is expected to be a single-level block index.");
1337       }
1338     }
1339 
1340     /**
1341      * @return true if we are using cache-on-write. This is configured by the
1342      *         caller of the constructor by either passing a valid block cache
1343      *         or null.
1344      */
1345     @Override
1346     public boolean getCacheOnWrite() {
1347       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1348     }
1349 
1350     /**
1351      * The total uncompressed size of the root index block, intermediate-level
1352      * index blocks, and leaf-level index blocks.
1353      *
1354      * @return the total uncompressed size of all index blocks
1355      */
1356     public long getTotalUncompressedSize() {
1357       return totalBlockUncompressedSize;
1358     }
1359 
1360   }
1361 
1362   /**
1363    * A single chunk of the block index in the process of writing. The data in
1364    * this chunk can become a leaf-level, intermediate-level, or root index
1365    * block.
1366    */
1367   static class BlockIndexChunk {
1368 
1369     /** First keys of the key range corresponding to each index entry. */
1370     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1371 
1372     /** Block offset in backing stream. */
1373     private final List<Long> blockOffsets = new ArrayList<Long>();
1374 
1375     /** On-disk data sizes of lower-level data or index blocks. */
1376     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1377 
1378     /**
1379      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1380      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1381      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1382      */
1383     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1384 
1385     /**
1386      * The offset of the next entry to be added, relative to the end of the
1387      * "secondary index" in the "non-root" format representation of this index
1388      * chunk. This is the next value to be added to the secondary index.
1389      */
1390     private int curTotalNonRootEntrySize = 0;
1391 
1392     /**
1393      * The accumulated size of this chunk if stored in the root index format.
1394      */
1395     private int curTotalRootSize = 0;
1396 
1397     /**
1398      * The "secondary index" used for binary search over variable-length
1399      * records in a "non-root" format block. These offsets are relative to the
1400      * end of this secondary index.
1401      */
1402     private final List<Integer> secondaryIndexOffsetMarks =
1403         new ArrayList<Integer>();
1404 
1405     /**
1406      * Adds a new entry to this block index chunk.
1407      *
1408      * @param firstKey the first key in the block pointed to by this entry
1409      * @param blockOffset the offset of the next-level block pointed to by this
1410      *          entry
1411      * @param onDiskDataSize the on-disk data of the block pointed to by this
1412      *          entry, including header size
1413      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1414      *          construction, this specifies the current total number of
1415      *          sub-entries in all leaf-level chunks, including the one
1416      *          corresponding to the second-level entry being added.
1417      */
1418     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1419         long curTotalNumSubEntries) {
1420       // Record the offset for the secondary index
1421       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1422       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1423           + firstKey.length;
1424 
1425       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1426           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1427 
1428       blockKeys.add(firstKey);
1429       blockOffsets.add(blockOffset);
1430       onDiskDataSizes.add(onDiskDataSize);
1431 
1432       if (curTotalNumSubEntries != -1) {
1433         numSubEntriesAt.add(curTotalNumSubEntries);
1434 
1435         // Make sure the parallel arrays are in sync.
1436         if (numSubEntriesAt.size() != blockKeys.size()) {
1437           throw new IllegalStateException("Only have key/value count " +
1438               "stats for " + numSubEntriesAt.size() + " block index " +
1439               "entries out of " + blockKeys.size());
1440         }
1441       }
1442     }
1443 
1444     /**
1445      * The same as {@link #add(byte[], long, int, long)} but does not take the
1446      * key/value into account. Used for single-level indexes.
1447      *
1448      * @see {@link #add(byte[], long, int, long)}
1449      */
1450     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1451       add(firstKey, blockOffset, onDiskDataSize, -1);
1452     }
1453 
1454     public void clear() {
1455       blockKeys.clear();
1456       blockOffsets.clear();
1457       onDiskDataSizes.clear();
1458       secondaryIndexOffsetMarks.clear();
1459       numSubEntriesAt.clear();
1460       curTotalNonRootEntrySize = 0;
1461       curTotalRootSize = 0;
1462     }
1463 
1464     /**
1465      * Finds the entry corresponding to the deeper-level index block containing
1466      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1467      * ordering of sub-entries.
1468      *
1469      * <p>
1470      * <i> Implementation note. </i> We are looking for i such that
1471      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1472      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1473      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1474      * sub-entries. i is by definition the insertion point of k in
1475      * numSubEntriesAt.
1476      *
1477      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1478      * @return the 0-based index of the entry corresponding to the given
1479      *         sub-entry
1480      */
1481     public int getEntryBySubEntry(long k) {
1482       // We define mid-key as the key corresponding to k'th sub-entry
1483       // (0-based).
1484 
1485       int i = Collections.binarySearch(numSubEntriesAt, k);
1486 
1487       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1488       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1489       // is in the (i + 1)'th chunk.
1490       if (i >= 0)
1491         return i + 1;
1492 
1493       // Inexact match. Return the insertion point.
1494       return -i - 1;
1495     }
1496 
1497     /**
1498      * Used when writing the root block index of a multi-level block index.
1499      * Serializes additional information allowing to efficiently identify the
1500      * mid-key.
1501      *
1502      * @return a few serialized fields for finding the mid-key
1503      * @throws IOException if could not create metadata for computing mid-key
1504      */
1505     public byte[] getMidKeyMetadata() throws IOException {
1506       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1507           MID_KEY_METADATA_SIZE);
1508       DataOutputStream baosDos = new DataOutputStream(baos);
1509       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1510       if (totalNumSubEntries == 0) {
1511         throw new IOException("No leaf-level entries, mid-key unavailable");
1512       }
1513       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1514       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1515 
1516       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1517       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1518 
1519       long numSubEntriesBefore = midKeyEntry > 0
1520           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1521       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1522       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1523       {
1524         throw new IOException("Could not identify mid-key index within the "
1525             + "leaf-level block containing mid-key: out of range ("
1526             + subEntryWithinEntry + ", numSubEntriesBefore="
1527             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1528             + ")");
1529       }
1530 
1531       baosDos.writeInt((int) subEntryWithinEntry);
1532 
1533       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1534         throw new IOException("Could not write mid-key metadata: size=" +
1535             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1536       }
1537 
1538       // Close just to be good citizens, although this has no effect.
1539       baos.close();
1540 
1541       return baos.toByteArray();
1542     }
1543 
1544     /**
1545      * Writes the block index chunk in the non-root index block format. This
1546      * format contains the number of entries, an index of integer offsets
1547      * for quick binary search on variable-length records, and tuples of
1548      * block offset, on-disk block size, and the first key for each entry.
1549      *
1550      * @param out
1551      * @throws IOException
1552      */
1553     void writeNonRoot(DataOutput out) throws IOException {
1554       // The number of entries in the block.
1555       out.writeInt(blockKeys.size());
1556 
1557       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1558         throw new IOException("Corrupted block index chunk writer: " +
1559             blockKeys.size() + " entries but " +
1560             secondaryIndexOffsetMarks.size() + " secondary index items");
1561       }
1562 
1563       // For each entry, write a "secondary index" of relative offsets to the
1564       // entries from the end of the secondary index. This works, because at
1565       // read time we read the number of entries and know where the secondary
1566       // index ends.
1567       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1568         out.writeInt(currentSecondaryIndex);
1569 
1570       // We include one other element in the secondary index to calculate the
1571       // size of each entry more easily by subtracting secondary index elements.
1572       out.writeInt(curTotalNonRootEntrySize);
1573 
1574       for (int i = 0; i < blockKeys.size(); ++i) {
1575         out.writeLong(blockOffsets.get(i));
1576         out.writeInt(onDiskDataSizes.get(i));
1577         out.write(blockKeys.get(i));
1578       }
1579     }
1580 
1581     /**
1582      * @return the size of this chunk if stored in the non-root index block
1583      *         format
1584      */
1585     int getNonRootSize() {
1586       return Bytes.SIZEOF_INT                          // Number of entries
1587           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1588           + curTotalNonRootEntrySize;                  // All entries
1589     }
1590 
1591     /**
1592      * Writes this chunk into the given output stream in the root block index
1593      * format. This format is similar to the {@link HFile} version 1 block
1594      * index format, except that we store on-disk size of the block instead of
1595      * its uncompressed size.
1596      *
1597      * @param out the data output stream to write the block index to. Typically
1598      *          a stream writing into an {@link HFile} block.
1599      * @throws IOException
1600      */
1601     void writeRoot(DataOutput out) throws IOException {
1602       for (int i = 0; i < blockKeys.size(); ++i) {
1603         out.writeLong(blockOffsets.get(i));
1604         out.writeInt(onDiskDataSizes.get(i));
1605         Bytes.writeByteArray(out, blockKeys.get(i));
1606       }
1607     }
1608 
1609     /**
1610      * @return the size of this chunk if stored in the root index block format
1611      */
1612     int getRootSize() {
1613       return curTotalRootSize;
1614     }
1615 
1616     /**
1617      * @return the number of entries in this block index chunk
1618      */
1619     public int getNumEntries() {
1620       return blockKeys.size();
1621     }
1622 
1623     public byte[] getBlockKey(int i) {
1624       return blockKeys.get(i);
1625     }
1626 
1627     public long getBlockOffset(int i) {
1628       return blockOffsets.get(i);
1629     }
1630 
1631     public int getOnDiskDataSize(int i) {
1632       return onDiskDataSizes.get(i);
1633     }
1634 
1635     public long getCumulativeNumKV(int i) {
1636       if (i < 0)
1637         return 0;
1638       return numSubEntriesAt.get(i);
1639     }
1640 
1641   }
1642 
1643   public static int getMaxChunkSize(Configuration conf) {
1644     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1645   }
1646 }