View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.ByteBufferedKeyOnlyKeyValue;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellComparator;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.io.HeapSize;
45  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
46  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
47  import org.apache.hadoop.hbase.nio.ByteBuff;
48  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ClassSize;
51  import org.apache.hadoop.hbase.util.ObjectIntPair;
52  import org.apache.hadoop.io.WritableUtils;
53  import org.apache.hadoop.util.StringUtils;
54  
55  /**
56   * Provides functionality to write ({@link BlockIndexWriter}) and read
57   * BlockIndexReader
58   * single-level and multi-level block indexes.
59   *
60   * Examples of how to use the block index writer can be found in
61   * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and
62   *  {@link HFileWriterImpl}. Examples of how to use the reader can be
63   *  found in {@link HFileReaderImpl} and
64   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
65   */
66  @InterfaceAudience.Private
67  public class HFileBlockIndex {
68  
69    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
70  
71    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
72  
73    /**
74     * The maximum size guideline for index blocks (both leaf, intermediate, and
75     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
76     */
77    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
78  
79    /**
80     * The number of bytes stored in each "secondary index" entry in addition to
81     * key bytes in the non-root index block format. The first long is the file
82     * offset of the deeper-level block the entry points to, and the int that
83     * follows is that block's on-disk size without including header.
84     */
85    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
86        + Bytes.SIZEOF_LONG;
87  
88    /**
89     * Error message when trying to use inline block API in single-level mode.
90     */
91    private static final String INLINE_BLOCKS_NOT_ALLOWED =
92        "Inline blocks are not allowed in the single-level-only mode";
93  
94    /**
95     * The size of a meta-data record used for finding the mid-key in a
96     * multi-level index. Consists of the middle leaf-level index block offset
97     * (long), its on-disk size without header included (int), and the mid-key
98     * entry's zero-based index in that leaf index block.
99     */
100   private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
101       2 * Bytes.SIZEOF_INT;
102 
103   /**
104    * An implementation of the BlockIndexReader that deals with block keys which are plain
105    * byte[] like MetaBlock or the Bloom Block for ROW bloom.
106    * Does not need a comparator. It can work on Bytes.BYTES_RAWCOMPARATOR
107    */
108    static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
109 
110     private byte[][] blockKeys;
111 
112     public ByteArrayKeyBlockIndexReader(final int treeLevel,
113         final CachingBlockReader cachingBlockReader) {
114       this(treeLevel);
115       this.cachingBlockReader = cachingBlockReader;
116     }
117 
118     public ByteArrayKeyBlockIndexReader(final int treeLevel) {
119       // Can be null for METAINDEX block
120       searchTreeLevel = treeLevel;
121     }
122 
123     protected long calculateHeapSizeForBlockKeys(long heapSize) {
124       // Calculating the size of blockKeys
125       if (blockKeys != null) {
126         heapSize += ClassSize.REFERENCE;
127         // Adding array + references overhead
128         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
129 
130         // Adding bytes
131         for (byte[] key : blockKeys) {
132           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
133         }
134       }
135       return heapSize;
136     }
137 
138     @Override
139     public boolean isEmpty() {
140       return blockKeys.length == 0;
141     }
142 
143     /**
144      * @param i
145      *          from 0 to {@link #getRootBlockCount() - 1}
146      */
147     public byte[] getRootBlockKey(int i) {
148       return blockKeys[i];
149     }
150 
151     @Override
152     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
153         boolean cacheBlocks, boolean pread, boolean isCompaction,
154         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
155       // this would not be needed
156       return null;
157     }
158 
159     @Override
160     public Cell midkey() throws IOException {
161       // Not needed here
162       return null;
163     }
164 
165     protected void initialize(int numEntries) {
166       blockKeys = new byte[numEntries][];
167     }
168 
169     protected void add(final byte[] key, final long offset, final int dataSize) {
170       blockOffsets[rootCount] = offset;
171       blockKeys[rootCount] = key;
172       blockDataSizes[rootCount] = dataSize;
173       rootCount++;
174     }
175 
176     @Override
177     public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
178       int pos = Bytes.binarySearch(blockKeys, key, offset, length);
179       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
180       // binarySearch's javadoc.
181 
182       if (pos >= 0) {
183         // This means this is an exact match with an element of blockKeys.
184         assert pos < blockKeys.length;
185         return pos;
186       }
187 
188       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
189       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
190       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
191       // key < blockKeys[0], meaning the file does not contain the given key.
192 
193       int i = -pos - 1;
194       assert 0 <= i && i <= blockKeys.length;
195       return i - 1;
196     }
197 
198     @Override
199     public int rootBlockContainingKey(Cell key) {
200       // Should not be called on this because here it deals only with byte[]
201       throw new UnsupportedOperationException(
202           "Cannot search for a key that is of Cell type. Only plain byte array keys " +
203           "can be searched for");
204     }
205 
206     @Override
207     public String toString() {
208       StringBuilder sb = new StringBuilder();
209       sb.append("size=" + rootCount).append("\n");
210       for (int i = 0; i < rootCount; i++) {
211         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
212             .append("\n  offset=").append(blockOffsets[i])
213             .append(", dataSize=" + blockDataSizes[i]).append("\n");
214       }
215       return sb.toString();
216     }
217 
218   }
219 
220   /**
221    * An implementation of the BlockIndexReader that deals with block keys which are the key
222    * part of a cell like the Data block index or the ROW_COL bloom blocks
223    * This needs a comparator to work with the Cells
224    */
225    static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
226 
227     private Cell[] blockKeys;
228     /** Pre-computed mid-key */
229     private AtomicReference<Cell> midKey = new AtomicReference<Cell>();
230     /** Needed doing lookup on blocks. */
231     private CellComparator comparator;
232 
233     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel,
234         final CachingBlockReader cachingBlockReader) {
235       this(c, treeLevel);
236       this.cachingBlockReader = cachingBlockReader;
237     }
238 
239     public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
240       // Can be null for METAINDEX block
241       comparator = c;
242       searchTreeLevel = treeLevel;
243     }
244 
245     protected long calculateHeapSizeForBlockKeys(long heapSize) {
246       if (blockKeys != null) {
247         heapSize += ClassSize.REFERENCE;
248         // Adding array + references overhead
249         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
250 
251         // Adding blockKeys
252         for (Cell key : blockKeys) {
253           heapSize += ClassSize.align(CellUtil.estimatedHeapSizeOf(key));
254         }
255       }
256       // Add comparator and the midkey atomicreference
257       heapSize += 2 * ClassSize.REFERENCE;
258       return heapSize;
259     }
260 
261     @Override
262     public boolean isEmpty() {
263       return blockKeys.length == 0;
264     }
265 
266     /**
267      * @param i
268      *          from 0 to {@link #getRootBlockCount() - 1}
269      */
270     public Cell getRootBlockKey(int i) {
271       return blockKeys[i];
272     }
273
274     @Override
275     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
276         boolean cacheBlocks, boolean pread, boolean isCompaction,
277         DataBlockEncoding expectedDataBlockEncoding) throws IOException {
278       int rootLevelIndex = rootBlockContainingKey(key);
279       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
280         return null;
281       }
282
283       // the next indexed key
284       Cell nextIndexedKey = null;
285
286       // Read the next-level (intermediate or leaf) index block.
287       long currentOffset = blockOffsets[rootLevelIndex];
288       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
289
290       if (rootLevelIndex < blockKeys.length - 1) {
291         nextIndexedKey = blockKeys[rootLevelIndex + 1];
292       } else {
293         nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
294       }
295
296       int lookupLevel = 1; // How many levels deep we are in our lookup.
297       int index = -1;
298
299       HFileBlock block = null;
300       boolean dataBlock = false;
301       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
302       while (true) {
303         try {
304           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
305             // Avoid reading the same block again, even with caching turned off.
306             // This is crucial for compaction-type workload which might have
307             // caching turned off. This is like a one-block cache inside the
308             // scanner.
309             block = currentBlock;
310           } else {
311             // Call HFile's caching block reader API. We always cache index
312             // blocks, otherwise we might get terrible performance.
313             boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
314             BlockType expectedBlockType;
315             if (lookupLevel < searchTreeLevel - 1) {
316               expectedBlockType = BlockType.INTERMEDIATE_INDEX;
317             } else if (lookupLevel == searchTreeLevel - 1) {
318               expectedBlockType = BlockType.LEAF_INDEX;
319             } else {
320               // this also accounts for ENCODED_DATA
321               expectedBlockType = BlockType.DATA;
322             }
323             block =
324                 cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
325                   isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
326           }
327
328           if (block == null) {
329             throw new IOException("Failed to read block at offset " + currentOffset
330                 + ", onDiskSize=" + currentOnDiskSize);
331           }
332
333           // Found a data block, break the loop and check our level in the tree.
334           if (block.getBlockType().isData()) {
335             dataBlock = true;
336             break;
337           }
338
339           // Not a data block. This must be a leaf-level or intermediate-level
340           // index block. We don't allow going deeper than searchTreeLevel.
341           if (++lookupLevel > searchTreeLevel) {
342             throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
343                 + ", searchTreeLevel=" + searchTreeLevel);
344           }
345
346           // Locate the entry corresponding to the given key in the non-root
347           // (leaf or intermediate-level) index block.
348           ByteBuff buffer = block.getBufferWithoutHeader();
349           index = locateNonRootIndexEntry(buffer, key, comparator);
350           if (index == -1) {
351             // This has to be changed
352             // For now change this to key value
353             throw new IOException("The key "
354                 + CellUtil.getCellKeyAsString(key)
355                 + " is before the" + " first key of the non-root index block " + block);
356           }
357
358           currentOffset = buffer.getLong();
359           currentOnDiskSize = buffer.getInt();
360
361           // Only update next indexed key if there is a next indexed key in the current level
362           byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
363           if (nonRootIndexedKey != null) {
364             tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
365             nextIndexedKey = tmpNextIndexKV;
366           }
367         } finally {
368           if (!dataBlock) {
369             // Return the block immediately if it is not the
370             // data block
371             cachingBlockReader.returnBlock(block);
372           }
373         }
374       }
375
376       if (lookupLevel != searchTreeLevel) {
377         assert dataBlock == true;
378         // Though we have retrieved a data block we have found an issue
379         // in the retrieved data block. Hence returned the block so that
380         // the ref count can be decremented
381         cachingBlockReader.returnBlock(block);
382         throw new IOException("Reached a data block at level " + lookupLevel +
383             " but the number of levels is " + searchTreeLevel);
384       }
385
386       // set the next indexed key for the current block.
387       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
388       return blockWithScanInfo;
389     }
390
391     @Override
392     public Cell midkey() throws IOException {
393       if (rootCount == 0)
394         throw new IOException("HFile empty");
395
396       Cell targetMidKey = this.midKey.get();
397       if (targetMidKey != null) {
398         return targetMidKey;
399       }
400
401       if (midLeafBlockOffset >= 0) {
402         if (cachingBlockReader == null) {
403           throw new IOException("Have to read the middle leaf block but " +
404               "no block reader available");
405         }
406
407         // Caching, using pread, assuming this is not a compaction.
408         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
409             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
410             BlockType.LEAF_INDEX, null);
411         try {
412           ByteBuff b = midLeafBlock.getBufferWithoutHeader();
413           int numDataBlocks = b.getIntAfterPosition(0);
414           int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1));
415           int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset;
416           int keyOffset =
417               Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
418                   + SECONDARY_INDEX_ENTRY_OVERHEAD;
419           byte[] bytes = b.toBytes(keyOffset, keyLen);
420           targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
421         } finally {
422           cachingBlockReader.returnBlock(midLeafBlock);
423         }
424       } else {
425         // The middle of the root-level index.
426         targetMidKey = blockKeys[rootCount / 2];
427       }
428
429       this.midKey.set(targetMidKey);
430       return targetMidKey;
431     }
432
433     protected void initialize(int numEntries) {
434       blockKeys = new Cell[numEntries];
435     }
436
437     /**
438      * Adds a new entry in the root block index. Only used when reading.
439      *
440      * @param key Last key in the block
441      * @param offset file offset where the block is stored
442      * @param dataSize the uncompressed data size
443      */
444     protected void add(final byte[] key, final long offset, final int dataSize) {
445       blockOffsets[rootCount] = offset;
446       // Create the blockKeys as Cells once when the reader is opened
447       blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
448       blockDataSizes[rootCount] = dataSize;
449       rootCount++;
450     }
451
452     @Override
453     public int rootBlockContainingKey(final byte[] key, int offset, int length,
454         CellComparator comp) {
455       // This should always be called with Cell not with a byte[] key
456       throw new UnsupportedOperationException("Cannot find for a key containing plain byte " +
457       		"array. Only cell based keys can be searched for");
458     }
459
460     @Override
461     public int rootBlockContainingKey(Cell key) {
462       // Here the comparator should not be null as this happens for the root-level block
463       int pos = Bytes.binarySearch(blockKeys, key, comparator);
464       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
465       // binarySearch's javadoc.
466
467       if (pos >= 0) {
468         // This means this is an exact match with an element of blockKeys.
469         assert pos < blockKeys.length;
470         return pos;
471       }
472
473       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
474       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
475       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
476       // key < blockKeys[0], meaning the file does not contain the given key.
477
478       int i = -pos - 1;
479       assert 0 <= i && i <= blockKeys.length;
480       return i - 1;
481     }
482
483     @Override
484     public String toString() {
485       StringBuilder sb = new StringBuilder();
486       sb.append("size=" + rootCount).append("\n");
487       for (int i = 0; i < rootCount; i++) {
488         sb.append("key=").append((blockKeys[i]))
489             .append("\n  offset=").append(blockOffsets[i])
490             .append(", dataSize=" + blockDataSizes[i]).append("\n");
491       }
492       return sb.toString();
493     }
494   }
495    /**
496    * The reader will always hold the root level index in the memory. Index
497    * blocks at all other levels will be cached in the LRU cache in practice,
498    * although this API does not enforce that.
499    *
500    * All non-root (leaf and intermediate) index blocks contain what we call a
501    * "secondary index": an array of offsets to the entries within the block.
502    * This allows us to do binary search for the entry corresponding to the
503    * given key without having to deserialize the block.
504    */
505    static abstract class BlockIndexReader implements HeapSize {
506
507     protected long[] blockOffsets;
508     protected int[] blockDataSizes;
509     protected int rootCount = 0;
510
511     // Mid-key metadata.
512     protected long midLeafBlockOffset = -1;
513     protected int midLeafBlockOnDiskSize = -1;
514     protected int midKeyEntry = -1;
515
516     /**
517      * The number of levels in the block index tree. One if there is only root
518      * level, two for root and leaf levels, etc.
519      */
520     protected int searchTreeLevel;
521
522     /** A way to read {@link HFile} blocks at a given offset */
523     protected CachingBlockReader cachingBlockReader;
524
525     /**
526      * @return true if the block index is empty.
527      */
528     public abstract boolean isEmpty();
529
530     /**
531      * Verifies that the block index is non-empty and throws an
532      * {@link IllegalStateException} otherwise.
533      */
534     public void ensureNonEmpty() {
535       if (isEmpty()) {
536         throw new IllegalStateException("Block index is empty or not loaded");
537       }
538     }
539
540     /**
541      * Return the data block which contains this key. This function will only
542      * be called when the HFile version is larger than 1.
543      *
544      * @param key the key we are looking for
545      * @param currentBlock the current block, to avoid re-reading the same block
546      * @param cacheBlocks
547      * @param pread
548      * @param isCompaction
549      * @param expectedDataBlockEncoding the data block encoding the caller is
550      *          expecting the data block to be in, or null to not perform this
551      *          check and return the block irrespective of the encoding
552      * @return reader a basic way to load blocks
553      * @throws IOException
554      */
555     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
556         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
557         throws IOException {
558       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
559           cacheBlocks,
560           pread, isCompaction, expectedDataBlockEncoding);
561       if (blockWithScanInfo == null) {
562         return null;
563       } else {
564         return blockWithScanInfo.getHFileBlock();
565       }
566     }
567
568     /**
569      * Return the BlockWithScanInfo which contains the DataBlock with other scan
570      * info such as nextIndexedKey. This function will only be called when the
571      * HFile version is larger than 1.
572      * 
573      * @param key
574      *          the key we are looking for
575      * @param currentBlock
576      *          the current block, to avoid re-reading the same block
577      * @param cacheBlocks
578      * @param pread
579      * @param isCompaction
580      * @param expectedDataBlockEncoding the data block encoding the caller is
581      *          expecting the data block to be in, or null to not perform this
582      *          check and return the block irrespective of the encoding.
583      * @return the BlockWithScanInfo which contains the DataBlock with other
584      *         scan info such as nextIndexedKey.
585      * @throws IOException
586      */
587     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
588         boolean cacheBlocks,
589         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
590         throws IOException;
591
592     /**
593      * An approximation to the {@link HFile}'s mid-key. Operates on block
594      * boundaries, and does not go inside blocks. In other words, returns the
595      * first key of the middle block of the file.
596      *
597      * @return the first key of the middle block
598      */
599     public abstract Cell midkey() throws IOException;
600
601     /**
602      * @param i from 0 to {@link #getRootBlockCount() - 1}
603      */
604     public long getRootBlockOffset(int i) {
605       return blockOffsets[i];
606     }
607
608     /**
609      * @param i zero-based index of a root-level block
610      * @return the on-disk size of the root-level block for version 2, or the
611      *         uncompressed size for version 1
612      */
613     public int getRootBlockDataSize(int i) {
614       return blockDataSizes[i];
615     }
616
617     /**
618      * @return the number of root-level blocks in this block index
619      */
620     public int getRootBlockCount() {
621       return rootCount;
622     }
623
624     /**
625      * Finds the root-level index block containing the given key.
626      * 
627      * @param key
628      *          Key to find
629      * @param comp
630      *          the comparator to be used
631      * @return Offset of block containing <code>key</code> (between 0 and the
632      *         number of blocks - 1) or -1 if this file does not contain the
633      *         request.
634      */
635     // When we want to find the meta index block or bloom block for ROW bloom
636     // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
637     // CellComparator.
638     public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
639         CellComparator comp);
640
641     /**
642      * Finds the root-level index block containing the given key.
643      *
644      * @param key
645      *          Key to find
646      * @return Offset of block containing <code>key</code> (between 0 and the
647      *         number of blocks - 1) or -1 if this file does not contain the
648      *         request.
649      */
650     // When we want to find the meta index block or bloom block for ROW bloom
651     // type
652     // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
653     // need the CellComparator.
654     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
655       return rootBlockContainingKey(key, offset, length, null);
656     }
657
658     /**
659      * Finds the root-level index block containing the given key.
660      *
661      * @param key
662      *          Key to find
663      */
664     public abstract int rootBlockContainingKey(final Cell key);
665
666     /**
667      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
668      * @param nonRootIndex
669      * @param i the ith position
670      * @return The indexed key at the ith position in the nonRootIndex.
671      */
672     protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
673       int numEntries = nonRootIndex.getInt(0);
674       if (i < 0 || i >= numEntries) {
675         return null;
676       }
677
678       // Entries start after the number of entries and the secondary index.
679       // The secondary index takes numEntries + 1 ints.
680       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
681       // Targetkey's offset relative to the end of secondary index
682       int targetKeyRelOffset = nonRootIndex.getInt(
683           Bytes.SIZEOF_INT * (i + 1));
684
685       // The offset of the target key in the blockIndex buffer
686       int targetKeyOffset = entriesOffset     // Skip secondary index
687           + targetKeyRelOffset               // Skip all entries until mid
688           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
689
690       // We subtract the two consecutive secondary index elements, which
691       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
692       // then need to subtract the overhead of offset and onDiskSize.
693       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
694         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
695
696       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
697       return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
698     }
699
700     /**
701      * Performs a binary search over a non-root level index block. Utilizes the
702      * secondary index, which records the offsets of (offset, onDiskSize,
703      * firstKey) tuples of all entries.
704      * 
705      * @param key
706      *          the key we are searching for offsets to individual entries in
707      *          the blockIndex buffer
708      * @param nonRootIndex
709      *          the non-root index block buffer, starting with the secondary
710      *          index. The position is ignored.
711      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
712      *         keys[i + 1], if keys is the array of all keys being searched, or
713      *         -1 otherwise
714      * @throws IOException
715      */
716     static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
717         CellComparator comparator) {
718
719       int numEntries = nonRootIndex.getIntAfterPosition(0);
720       int low = 0;
721       int high = numEntries - 1;
722       int mid = 0;
723
724       // Entries start after the number of entries and the secondary index.
725       // The secondary index takes numEntries + 1 ints.
726       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
727
728       // If we imagine that keys[-1] = -Infinity and
729       // keys[numEntries] = Infinity, then we are maintaining an invariant that
730       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
731       ByteBufferedKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferedKeyOnlyKeyValue();
732       ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<ByteBuffer>();
733       while (low <= high) {
734         mid = (low + high) >>> 1;
735
736         // Midkey's offset relative to the end of secondary index
737         int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
738
739         // The offset of the middle key in the blockIndex buffer
740         int midKeyOffset = entriesOffset       // Skip secondary index
741             + midKeyRelOffset                  // Skip all entries until mid
742             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
743
744         // We subtract the two consecutive secondary index elements, which
745         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
746         // then need to subtract the overhead of offset and onDiskSize.
747         int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2)) -
748             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
749
750         // we have to compare in this order, because the comparator order
751         // has special logic when the 'left side' is a special key.
752         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
753         // done after HBASE-12224 & HBASE-12282
754         // TODO avoid array call.
755         nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
756         nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
757         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexkeyOnlyKV);
758
759         // key lives above the midpoint
760         if (cmp > 0)
761           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
762         // key lives below the midpoint
763         else if (cmp < 0)
764           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
765         else
766           return mid; // exact match
767       }
768
769       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
770       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
771       // condition, low >= high + 1. Therefore, low = high + 1.
772
773       if (low != high + 1) {
774         throw new IllegalStateException("Binary search broken: low=" + low
775             + " " + "instead of " + (high + 1));
776       }
777
778       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
779       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
780       int i = low - 1;
781
782       // Some extra validation on the result.
783       if (i < -1 || i >= numEntries) {
784         throw new IllegalStateException("Binary search broken: result is " +
785             i + " but expected to be between -1 and (numEntries - 1) = " +
786             (numEntries - 1));
787       }
788
789       return i;
790     }
791
792     /**
793      * Search for one key using the secondary index in a non-root block. In case
794      * of success, positions the provided buffer at the entry of interest, where
795      * the file offset and the on-disk-size can be read.
796      *
797      * @param nonRootBlock
798      *          a non-root block without header. Initial position does not
799      *          matter.
800      * @param key
801      *          the byte array containing the key
802      * @return the index position where the given key was found, otherwise
803      *         return -1 in the case the given key is before the first key.
804      *
805      */
806     static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
807         CellComparator comparator) {
808       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
809
810       if (entryIndex != -1) {
811         int numEntries = nonRootBlock.getIntAfterPosition(0);
812
813         // The end of secondary index and the beginning of entries themselves.
814         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
815
816         // The offset of the entry we are interested in relative to the end of
817         // the secondary index.
818         int entryRelOffset = nonRootBlock
819             .getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
820
821         nonRootBlock.position(entriesOffset + entryRelOffset);
822       }
823
824       return entryIndex;
825     }
826
827     /**
828      * Read in the root-level index from the given input stream. Must match
829      * what was written into the root level by
830      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
831      * offset that function returned.
832      *
833      * @param in the buffered input stream or wrapped byte input stream
834      * @param numEntries the number of root-level index entries
835      * @throws IOException
836      */
837     public void readRootIndex(DataInput in, final int numEntries) throws IOException {
838       blockOffsets = new long[numEntries];
839       initialize(numEntries);
840       blockDataSizes = new int[numEntries];
841
842       // If index size is zero, no index was written.
843       if (numEntries > 0) {
844         for (int i = 0; i < numEntries; ++i) {
845           long offset = in.readLong();
846           int dataSize = in.readInt();
847           byte[] key = Bytes.readByteArray(in);
848           add(key, offset, dataSize);
849         }
850       }
851     }
852
853     protected abstract void initialize(int numEntries);
854
855     protected abstract void add(final byte[] key, final long offset, final int dataSize);
856
857     /**
858      * Read in the root-level index from the given input stream. Must match
859      * what was written into the root level by
860      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
861      * offset that function returned.
862      *
863      * @param blk the HFile block
864      * @param numEntries the number of root-level index entries
865      * @return the buffered input stream or wrapped byte input stream
866      * @throws IOException
867      */
868     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
869       DataInputStream in = blk.getByteStream();
870       readRootIndex(in, numEntries);
871       return in;
872     }
873
874     /**
875      * Read the root-level metadata of a multi-level block index. Based on
876      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
877      * necessary to compute the mid-key in a multi-level index.
878      *
879      * @param blk the HFile block
880      * @param numEntries the number of root-level index entries
881      * @throws IOException
882      */
883     public void readMultiLevelIndexRoot(HFileBlock blk,
884         final int numEntries) throws IOException {
885       DataInputStream in = readRootIndex(blk, numEntries);
886       // after reading the root index the checksum bytes have to
887       // be subtracted to know if the mid key exists.
888       int checkSumBytes = blk.totalChecksumBytes();
889       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
890         // No mid-key metadata available.
891         return;
892       }
893       midLeafBlockOffset = in.readLong();
894       midLeafBlockOnDiskSize = in.readInt();
895       midKeyEntry = in.readInt();
896     }
897
898     @Override
899     public long heapSize() {
900       // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
901       long heapSize = ClassSize.align(3 * ClassSize.REFERENCE +
902           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
903
904       // Mid-key metadata.
905       heapSize += MID_KEY_METADATA_SIZE;
906
907       heapSize = calculateHeapSizeForBlockKeys(heapSize);
908
909       if (blockOffsets != null) {
910         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
911             * Bytes.SIZEOF_LONG);
912       }
913
914       if (blockDataSizes != null) {
915         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
916             * Bytes.SIZEOF_INT);
917       }
918
919       return ClassSize.align(heapSize);
920     }
921
922     protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
923   }
924
925   /**
926    * Writes the block index into the output stream. Generate the tree from
927    * bottom up. The leaf level is written to disk as a sequence of inline
928    * blocks, if it is larger than a certain number of bytes. If the leaf level
929    * is not large enough, we write all entries to the root level instead.
930    *
931    * After all leaf blocks have been written, we end up with an index
932    * referencing the resulting leaf index blocks. If that index is larger than
933    * the allowed root index size, the writer will break it up into
934    * reasonable-size intermediate-level index block chunks write those chunks
935    * out, and create another index referencing those chunks. This will be
936    * repeated until the remaining index is small enough to become the root
937    * index. However, in most practical cases we will only have leaf-level
938    * blocks and the root index, or just the root index.
939    */
940   public static class BlockIndexWriter implements InlineBlockWriter {
941     /**
942      * While the index is being written, this represents the current block
943      * index referencing all leaf blocks, with one exception. If the file is
944      * being closed and there are not enough blocks to complete even a single
945      * leaf block, no leaf blocks get written and this contains the entire
946      * block index. After all levels of the index were written by
947      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
948      * root-level index.
949      */
950     private BlockIndexChunk rootChunk = new BlockIndexChunk();
951
952     /**
953      * Current leaf-level chunk. New entries referencing data blocks get added
954      * to this chunk until it grows large enough to be written to disk.
955      */
956     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
957
958     /**
959      * The number of block index levels. This is one if there is only root
960      * level (even empty), two if there a leaf level and root level, and is
961      * higher if there are intermediate levels. This is only final after
962      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
963      * initial value accounts for the root level, and will be increased to two
964      * as soon as we find out there is a leaf-level in
965      * {@link #blockWritten(long, int, int)}.
966      */
967     private int numLevels = 1;
968
969     private HFileBlock.Writer blockWriter;
970     private byte[] firstKey = null;
971
972     /**
973      * The total number of leaf-level entries, i.e. entries referenced by
974      * leaf-level blocks. For the data block index this is equal to the number
975      * of data blocks.
976      */
977     private long totalNumEntries;
978
979     /** Total compressed size of all index blocks. */
980     private long totalBlockOnDiskSize;
981
982     /** Total uncompressed size of all index blocks. */
983     private long totalBlockUncompressedSize;
984
985     /** The maximum size guideline of all multi-level index blocks. */
986     private int maxChunkSize;
987
988     /** Whether we require this block index to always be single-level. */
989     private boolean singleLevelOnly;
990
991     /** CacheConfig, or null if cache-on-write is disabled */
992     private CacheConfig cacheConf;
993
994     /** Name to use for computing cache keys */
995     private String nameForCaching;
996
997     /** Creates a single-level block index writer */
998     public BlockIndexWriter() {
999       this(null, null, null);
1000       singleLevelOnly = true;
1001     }
1002
1003     /**
1004      * Creates a multi-level block index writer.
1005      *
1006      * @param blockWriter the block writer to use to write index blocks
1007      * @param cacheConf used to determine when and how a block should be cached-on-write.
1008      */
1009     public BlockIndexWriter(HFileBlock.Writer blockWriter,
1010         CacheConfig cacheConf, String nameForCaching) {
1011       if ((cacheConf == null) != (nameForCaching == null)) {
1012         throw new IllegalArgumentException("Block cache and file name for " +
1013             "caching must be both specified or both null");
1014       }
1015
1016       this.blockWriter = blockWriter;
1017       this.cacheConf = cacheConf;
1018       this.nameForCaching = nameForCaching;
1019       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1020     }
1021
1022     public void setMaxChunkSize(int maxChunkSize) {
1023       if (maxChunkSize <= 0) {
1024         throw new IllegalArgumentException("Invald maximum index block size");
1025       }
1026       this.maxChunkSize = maxChunkSize;
1027     }
1028
1029     /**
1030      * Writes the root level and intermediate levels of the block index into
1031      * the output stream, generating the tree from bottom up. Assumes that the
1032      * leaf level has been inline-written to the disk if there is enough data
1033      * for more than one leaf block. We iterate by breaking the current level
1034      * of the block index, starting with the index of all leaf-level blocks,
1035      * into chunks small enough to be written to disk, and generate its parent
1036      * level, until we end up with a level small enough to become the root
1037      * level.
1038      *
1039      * If the leaf level is not large enough, there is no inline block index
1040      * anymore, so we only write that level of block index to disk as the root
1041      * level.
1042      *
1043      * @param out FSDataOutputStream
1044      * @return position at which we entered the root-level index.
1045      * @throws IOException
1046      */
1047     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1048       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1049         throw new IOException("Trying to write a multi-level block index, " +
1050             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
1051             "last inline chunk.");
1052       }
1053
1054       // We need to get mid-key metadata before we create intermediate
1055       // indexes and overwrite the root chunk.
1056       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
1057           : null;
1058
1059       if (curInlineChunk != null) {
1060         while (rootChunk.getRootSize() > maxChunkSize) {
1061           rootChunk = writeIntermediateLevel(out, rootChunk);
1062           numLevels += 1;
1063         }
1064       }
1065
1066       // write the root level
1067       long rootLevelIndexPos = out.getPos();
1068
1069       {
1070         DataOutput blockStream =
1071             blockWriter.startWriting(BlockType.ROOT_INDEX);
1072         rootChunk.writeRoot(blockStream);
1073         if (midKeyMetadata != null)
1074           blockStream.write(midKeyMetadata);
1075         blockWriter.writeHeaderAndData(out);
1076         if (cacheConf != null) {
1077           HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1078           cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1079             rootLevelIndexPos, true, blockForCaching.getBlockType()), blockForCaching);
1080         }
1081       }
1082
1083       // Add root index block size
1084       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1085       totalBlockUncompressedSize +=
1086           blockWriter.getUncompressedSizeWithoutHeader();
1087
1088       if (LOG.isTraceEnabled()) {
1089         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1090           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
1091           + " root-level entries, " + totalNumEntries + " total entries, "
1092           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
1093           " on-disk size, "
1094           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
1095           " total uncompressed size.");
1096       }
1097       return rootLevelIndexPos;
1098     }
1099
1100     /**
1101      * Writes the block index data as a single level only. Does not do any
1102      * block framing.
1103      *
1104      * @param out the buffered output stream to write the index to. Typically a
1105      *          stream writing into an {@link HFile} block.
1106      * @param description a short description of the index being written. Used
1107      *          in a log message.
1108      * @throws IOException
1109      */
1110     public void writeSingleLevelIndex(DataOutput out, String description)
1111         throws IOException {
1112       expectNumLevels(1);
1113
1114       if (!singleLevelOnly)
1115         throw new IOException("Single-level mode is turned off");
1116
1117       if (rootChunk.getNumEntries() > 0)
1118         throw new IOException("Root-level entries already added in " +
1119             "single-level mode");
1120
1121       rootChunk = curInlineChunk;
1122       curInlineChunk = new BlockIndexChunk();
1123
1124       if (LOG.isTraceEnabled()) {
1125         LOG.trace("Wrote a single-level " + description + " index with "
1126           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
1127           + " bytes");
1128       }
1129       rootChunk.writeRoot(out);
1130     }
1131
1132     /**
1133      * Split the current level of the block index into intermediate index
1134      * blocks of permitted size and write those blocks to disk. Return the next
1135      * level of the block index referencing those intermediate-level blocks.
1136      *
1137      * @param out
1138      * @param currentLevel the current level of the block index, such as the a
1139      *          chunk referencing all leaf-level index blocks
1140      * @return the parent level block index, which becomes the root index after
1141      *         a few (usually zero) iterations
1142      * @throws IOException
1143      */
1144     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1145         BlockIndexChunk currentLevel) throws IOException {
1146       // Entries referencing intermediate-level blocks we are about to create.
1147       BlockIndexChunk parent = new BlockIndexChunk();
1148
1149       // The current intermediate-level block index chunk.
1150       BlockIndexChunk curChunk = new BlockIndexChunk();
1151
1152       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1153         curChunk.add(currentLevel.getBlockKey(i),
1154             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
1155
1156         if (curChunk.getRootSize() >= maxChunkSize)
1157           writeIntermediateBlock(out, parent, curChunk);
1158       }
1159
1160       if (curChunk.getNumEntries() > 0) {
1161         writeIntermediateBlock(out, parent, curChunk);
1162       }
1163
1164       return parent;
1165     }
1166
1167     private void writeIntermediateBlock(FSDataOutputStream out,
1168         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
1169       long beginOffset = out.getPos();
1170       DataOutputStream dos = blockWriter.startWriting(
1171           BlockType.INTERMEDIATE_INDEX);
1172       curChunk.writeNonRoot(dos);
1173       byte[] curFirstKey = curChunk.getBlockKey(0);
1174       blockWriter.writeHeaderAndData(out);
1175
1176       if (getCacheOnWrite()) {
1177         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1178         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
1179           beginOffset, true, blockForCaching.getBlockType()), blockForCaching);
1180       }
1181
1182       // Add intermediate index block size
1183       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1184       totalBlockUncompressedSize +=
1185           blockWriter.getUncompressedSizeWithoutHeader();
1186
1187       // OFFSET is the beginning offset the chunk of block index entries.
1188       // SIZE is the total byte size of the chunk of block index entries
1189       // + the secondary index size
1190       // FIRST_KEY is the first key in the chunk of block index
1191       // entries.
1192       parent.add(curFirstKey, beginOffset,
1193           blockWriter.getOnDiskSizeWithHeader());
1194
1195       // clear current block index chunk
1196       curChunk.clear();
1197       curFirstKey = null;
1198     }
1199
1200     /**
1201      * @return how many block index entries there are in the root level
1202      */
1203     public final int getNumRootEntries() {
1204       return rootChunk.getNumEntries();
1205     }
1206
1207     /**
1208      * @return the number of levels in this block index.
1209      */
1210     public int getNumLevels() {
1211       return numLevels;
1212     }
1213
1214     private void expectNumLevels(int expectedNumLevels) {
1215       if (numLevels != expectedNumLevels) {
1216         throw new IllegalStateException("Number of block index levels is "
1217             + numLevels + "but is expected to be " + expectedNumLevels);
1218       }
1219     }
1220
1221     /**
1222      * Whether there is an inline block ready to be written. In general, we
1223      * write an leaf-level index block as an inline block as soon as its size
1224      * as serialized in the non-root format reaches a certain threshold.
1225      */
1226     @Override
1227     public boolean shouldWriteBlock(boolean closing) {
1228       if (singleLevelOnly) {
1229         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1230       }
1231
1232       if (curInlineChunk == null) {
1233         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1234             "called with closing=true and then called again?");
1235       }
1236
1237       if (curInlineChunk.getNumEntries() == 0) {
1238         return false;
1239       }
1240
1241       // We do have some entries in the current inline chunk.
1242       if (closing) {
1243         if (rootChunk.getNumEntries() == 0) {
1244           // We did not add any leaf-level blocks yet. Instead of creating a
1245           // leaf level with one block, move these entries to the root level.
1246
1247           expectNumLevels(1);
1248           rootChunk = curInlineChunk;
1249           curInlineChunk = null;  // Disallow adding any more index entries.
1250           return false;
1251         }
1252
1253         return true;
1254       } else {
1255         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1256       }
1257     }
1258
1259     /**
1260      * Write out the current inline index block. Inline blocks are non-root
1261      * blocks, so the non-root index format is used.
1262      *
1263      * @param out
1264      */
1265     @Override
1266     public void writeInlineBlock(DataOutput out) throws IOException {
1267       if (singleLevelOnly)
1268         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1269
1270       // Write the inline block index to the output stream in the non-root
1271       // index block format.
1272       curInlineChunk.writeNonRoot(out);
1273
1274       // Save the first key of the inline block so that we can add it to the
1275       // parent-level index.
1276       firstKey = curInlineChunk.getBlockKey(0);
1277
1278       // Start a new inline index block
1279       curInlineChunk.clear();
1280     }
1281
1282     /**
1283      * Called after an inline block has been written so that we can add an
1284      * entry referring to that block to the parent-level index.
1285      */
1286     @Override
1287     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1288       // Add leaf index block size
1289       totalBlockOnDiskSize += onDiskSize;
1290       totalBlockUncompressedSize += uncompressedSize;
1291
1292       if (singleLevelOnly)
1293         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1294
1295       if (firstKey == null) {
1296         throw new IllegalStateException("Trying to add second-level index " +
1297             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1298             "but the first key was not set in writeInlineBlock");
1299       }
1300
1301       if (rootChunk.getNumEntries() == 0) {
1302         // We are writing the first leaf block, so increase index level.
1303         expectNumLevels(1);
1304         numLevels = 2;
1305       }
1306
1307       // Add another entry to the second-level index. Include the number of
1308       // entries in all previous leaf-level chunks for mid-key calculation.
1309       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1310       firstKey = null;
1311     }
1312
1313     @Override
1314     public BlockType getInlineBlockType() {
1315       return BlockType.LEAF_INDEX;
1316     }
1317
1318     /**
1319      * Add one index entry to the current leaf-level block. When the leaf-level
1320      * block gets large enough, it will be flushed to disk as an inline block.
1321      *
1322      * @param firstKey the first key of the data block
1323      * @param blockOffset the offset of the data block
1324      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1325      *          format version 2), or the uncompressed size of the data block (
1326      *          {@link HFile} format version 1).
1327      */
1328     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1329       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1330       ++totalNumEntries;
1331     }
1332
1333     /**
1334      * @throws IOException if we happened to write a multi-level index.
1335      */
1336     public void ensureSingleLevel() throws IOException {
1337       if (numLevels > 1) {
1338         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1339             rootChunk.getNumEntries() + " root-level entries, but " +
1340             "this is expected to be a single-level block index.");
1341       }
1342     }
1343
1344     /**
1345      * @return true if we are using cache-on-write. This is configured by the
1346      *         caller of the constructor by either passing a valid block cache
1347      *         or null.
1348      */
1349     @Override
1350     public boolean getCacheOnWrite() {
1351       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1352     }
1353
1354     /**
1355      * The total uncompressed size of the root index block, intermediate-level
1356      * index blocks, and leaf-level index blocks.
1357      *
1358      * @return the total uncompressed size of all index blocks
1359      */
1360     public long getTotalUncompressedSize() {
1361       return totalBlockUncompressedSize;
1362     }
1363
1364   }
1365
1366   /**
1367    * A single chunk of the block index in the process of writing. The data in
1368    * this chunk can become a leaf-level, intermediate-level, or root index
1369    * block.
1370    */
1371   static class BlockIndexChunk {
1372
1373     /** First keys of the key range corresponding to each index entry. */
1374     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1375
1376     /** Block offset in backing stream. */
1377     private final List<Long> blockOffsets = new ArrayList<Long>();
1378
1379     /** On-disk data sizes of lower-level data or index blocks. */
1380     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1381
1382     /**
1383      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1384      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1385      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1386      */
1387     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1388
1389     /**
1390      * The offset of the next entry to be added, relative to the end of the
1391      * "secondary index" in the "non-root" format representation of this index
1392      * chunk. This is the next value to be added to the secondary index.
1393      */
1394     private int curTotalNonRootEntrySize = 0;
1395
1396     /**
1397      * The accumulated size of this chunk if stored in the root index format.
1398      */
1399     private int curTotalRootSize = 0;
1400
1401     /**
1402      * The "secondary index" used for binary search over variable-length
1403      * records in a "non-root" format block. These offsets are relative to the
1404      * end of this secondary index.
1405      */
1406     private final List<Integer> secondaryIndexOffsetMarks =
1407         new ArrayList<Integer>();
1408
1409     /**
1410      * Adds a new entry to this block index chunk.
1411      *
1412      * @param firstKey the first key in the block pointed to by this entry
1413      * @param blockOffset the offset of the next-level block pointed to by this
1414      *          entry
1415      * @param onDiskDataSize the on-disk data of the block pointed to by this
1416      *          entry, including header size
1417      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1418      *          construction, this specifies the current total number of
1419      *          sub-entries in all leaf-level chunks, including the one
1420      *          corresponding to the second-level entry being added.
1421      */
1422     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1423         long curTotalNumSubEntries) {
1424       // Record the offset for the secondary index
1425       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1426       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1427           + firstKey.length;
1428
1429       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1430           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1431
1432       blockKeys.add(firstKey);
1433       blockOffsets.add(blockOffset);
1434       onDiskDataSizes.add(onDiskDataSize);
1435
1436       if (curTotalNumSubEntries != -1) {
1437         numSubEntriesAt.add(curTotalNumSubEntries);
1438
1439         // Make sure the parallel arrays are in sync.
1440         if (numSubEntriesAt.size() != blockKeys.size()) {
1441           throw new IllegalStateException("Only have key/value count " +
1442               "stats for " + numSubEntriesAt.size() + " block index " +
1443               "entries out of " + blockKeys.size());
1444         }
1445       }
1446     }
1447
1448     /**
1449      * The same as {@link #add(byte[], long, int, long)} but does not take the
1450      * key/value into account. Used for single-level indexes.
1451      *
1452      * @see {@link #add(byte[], long, int, long)}
1453      */
1454     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1455       add(firstKey, blockOffset, onDiskDataSize, -1);
1456     }
1457
1458     public void clear() {
1459       blockKeys.clear();
1460       blockOffsets.clear();
1461       onDiskDataSizes.clear();
1462       secondaryIndexOffsetMarks.clear();
1463       numSubEntriesAt.clear();
1464       curTotalNonRootEntrySize = 0;
1465       curTotalRootSize = 0;
1466     }
1467
1468     /**
1469      * Finds the entry corresponding to the deeper-level index block containing
1470      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1471      * ordering of sub-entries.
1472      *
1473      * <p>
1474      * <i> Implementation note. </i> We are looking for i such that
1475      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1476      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1477      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1478      * sub-entries. i is by definition the insertion point of k in
1479      * numSubEntriesAt.
1480      *
1481      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1482      * @return the 0-based index of the entry corresponding to the given
1483      *         sub-entry
1484      */
1485     public int getEntryBySubEntry(long k) {
1486       // We define mid-key as the key corresponding to k'th sub-entry
1487       // (0-based).
1488
1489       int i = Collections.binarySearch(numSubEntriesAt, k);
1490
1491       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1492       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1493       // is in the (i + 1)'th chunk.
1494       if (i >= 0)
1495         return i + 1;
1496
1497       // Inexact match. Return the insertion point.
1498       return -i - 1;
1499     }
1500
1501     /**
1502      * Used when writing the root block index of a multi-level block index.
1503      * Serializes additional information allowing to efficiently identify the
1504      * mid-key.
1505      *
1506      * @return a few serialized fields for finding the mid-key
1507      * @throws IOException if could not create metadata for computing mid-key
1508      */
1509     public byte[] getMidKeyMetadata() throws IOException {
1510       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1511           MID_KEY_METADATA_SIZE);
1512       DataOutputStream baosDos = new DataOutputStream(baos);
1513       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1514       if (totalNumSubEntries == 0) {
1515         throw new IOException("No leaf-level entries, mid-key unavailable");
1516       }
1517       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1518       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1519
1520       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1521       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1522
1523       long numSubEntriesBefore = midKeyEntry > 0
1524           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1525       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1526       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1527       {
1528         throw new IOException("Could not identify mid-key index within the "
1529             + "leaf-level block containing mid-key: out of range ("
1530             + subEntryWithinEntry + ", numSubEntriesBefore="
1531             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1532             + ")");
1533       }
1534
1535       baosDos.writeInt((int) subEntryWithinEntry);
1536
1537       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1538         throw new IOException("Could not write mid-key metadata: size=" +
1539             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1540       }
1541
1542       // Close just to be good citizens, although this has no effect.
1543       baos.close();
1544
1545       return baos.toByteArray();
1546     }
1547
1548     /**
1549      * Writes the block index chunk in the non-root index block format. This
1550      * format contains the number of entries, an index of integer offsets
1551      * for quick binary search on variable-length records, and tuples of
1552      * block offset, on-disk block size, and the first key for each entry.
1553      *
1554      * @param out
1555      * @throws IOException
1556      */
1557     void writeNonRoot(DataOutput out) throws IOException {
1558       // The number of entries in the block.
1559       out.writeInt(blockKeys.size());
1560
1561       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1562         throw new IOException("Corrupted block index chunk writer: " +
1563             blockKeys.size() + " entries but " +
1564             secondaryIndexOffsetMarks.size() + " secondary index items");
1565       }
1566
1567       // For each entry, write a "secondary index" of relative offsets to the
1568       // entries from the end of the secondary index. This works, because at
1569       // read time we read the number of entries and know where the secondary
1570       // index ends.
1571       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1572         out.writeInt(currentSecondaryIndex);
1573
1574       // We include one other element in the secondary index to calculate the
1575       // size of each entry more easily by subtracting secondary index elements.
1576       out.writeInt(curTotalNonRootEntrySize);
1577
1578       for (int i = 0; i < blockKeys.size(); ++i) {
1579         out.writeLong(blockOffsets.get(i));
1580         out.writeInt(onDiskDataSizes.get(i));
1581         out.write(blockKeys.get(i));
1582       }
1583     }
1584
1585     /**
1586      * @return the size of this chunk if stored in the non-root index block
1587      *         format
1588      */
1589     int getNonRootSize() {
1590       return Bytes.SIZEOF_INT                          // Number of entries
1591           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1592           + curTotalNonRootEntrySize;                  // All entries
1593     }
1594
1595     /**
1596      * Writes this chunk into the given output stream in the root block index
1597      * format. This format is similar to the {@link HFile} version 1 block
1598      * index format, except that we store on-disk size of the block instead of
1599      * its uncompressed size.
1600      *
1601      * @param out the data output stream to write the block index to. Typically
1602      *          a stream writing into an {@link HFile} block.
1603      * @throws IOException
1604      */
1605     void writeRoot(DataOutput out) throws IOException {
1606       for (int i = 0; i < blockKeys.size(); ++i) {
1607         out.writeLong(blockOffsets.get(i));
1608         out.writeInt(onDiskDataSizes.get(i));
1609         Bytes.writeByteArray(out, blockKeys.get(i));
1610       }
1611     }
1612
1613     /**
1614      * @return the size of this chunk if stored in the root index block format
1615      */
1616     int getRootSize() {
1617       return curTotalRootSize;
1618     }
1619
1620     /**
1621      * @return the number of entries in this block index chunk
1622      */
1623     public int getNumEntries() {
1624       return blockKeys.size();
1625     }
1626
1627     public byte[] getBlockKey(int i) {
1628       return blockKeys.get(i);
1629     }
1630
1631     public long getBlockOffset(int i) {
1632       return blockOffsets.get(i);
1633     }
1634
1635     public int getOnDiskDataSize(int i) {
1636       return onDiskDataSizes.get(i);
1637     }
1638
1639     public long getCumulativeNumKV(int i) {
1640       if (i < 0)
1641         return 0;
1642       return numSubEntriesAt.get(i);
1643     }
1644
1645   }
1646
1647   public static int getMaxChunkSize(Configuration conf) {
1648     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1649   }
1650 }