View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataInput;
24  import java.io.DataInputStream;
25  import java.io.DataOutput;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.nio.ByteBuffer;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.List;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
44  import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.ClassSize;
47  import org.apache.hadoop.hbase.util.CompoundBloomFilterWriter;
48  import org.apache.hadoop.io.RawComparator;
49  import org.apache.hadoop.io.WritableUtils;
50  import org.apache.hadoop.util.StringUtils;
51  
52  /**
53   * Provides functionality to write ({@link BlockIndexWriter}) and read
54   * ({@link BlockIndexReader}) single-level and multi-level block indexes.
55   *
56   * Examples of how to use the block index writer can be found in
57   * {@link CompoundBloomFilterWriter} and {@link HFileWriterV2}. Examples of how
58   * to use the reader can be found in {@link HFileReaderV2} and
59   * TestHFileBlockIndex.
60   */
61  public class HFileBlockIndex {
62  
63    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
64  
65    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
66  
67    /**
68     * The maximum size guideline for index blocks (both leaf, intermediate, and
69     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
70     */
71    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
72  
73    /**
74     * The number of bytes stored in each "secondary index" entry in addition to
75     * key bytes in the non-root index block format. The first long is the file
76     * offset of the deeper-level block the entry points to, and the int that
77     * follows is that block's on-disk size without including header.
78     */
79    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
80        + Bytes.SIZEOF_LONG;
81  
82    /**
83     * Error message when trying to use inline block API in single-level mode.
84     */
85    private static final String INLINE_BLOCKS_NOT_ALLOWED =
86        "Inline blocks are not allowed in the single-level-only mode";
87  
88    /**
89     * The size of a meta-data record used for finding the mid-key in a
90     * multi-level index. Consists of the middle leaf-level index block offset
91     * (long), its on-disk size without header included (int), and the mid-key
92     * entry's zero-based index in that leaf index block.
93     */
94    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
95        2 * Bytes.SIZEOF_INT;
96  
97    /**
98     * The reader will always hold the root level index in the memory. Index
99     * blocks at all other levels will be cached in the LRU cache in practice,
100    * although this API does not enforce that.
101    *
102    * All non-root (leaf and intermediate) index blocks contain what we call a
103    * "secondary index": an array of offsets to the entries within the block.
104    * This allows us to do binary search for the entry corresponding to the
105    * given key without having to deserialize the block.
106    */
107   public static class BlockIndexReader implements HeapSize {
108     /** Needed doing lookup on blocks. */
109     private final RawComparator<byte[]> comparator;
110 
111     // Root-level data.
112     private byte[][] blockKeys;
113     private long[] blockOffsets;
114     private int[] blockDataSizes;
115     private int rootByteSize = 0;
116     private int rootCount = 0;
117 
118     // Mid-key metadata.
119     private long midLeafBlockOffset = -1;
120     private int midLeafBlockOnDiskSize = -1;
121     private int midKeyEntry = -1;
122 
123     /** Pre-computed mid-key */
124     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
125 
126     /**
127      * The number of levels in the block index tree. One if there is only root
128      * level, two for root and leaf levels, etc.
129      */
130     private int searchTreeLevel;
131 
132     /** A way to read {@link HFile} blocks at a given offset */
133     private CachingBlockReader cachingBlockReader;
134 
135     public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel,
136         final CachingBlockReader cachingBlockReader) {
137       this(c, treeLevel);
138       this.cachingBlockReader = cachingBlockReader;
139     }
140 
141     public BlockIndexReader(final RawComparator<byte[]> c, final int treeLevel)
142     {
143       comparator = c;
144       searchTreeLevel = treeLevel;
145     }
146 
147     /**
148      * @return true if the block index is empty.
149      */
150     public boolean isEmpty() {
151       return blockKeys.length == 0;
152     }
153 
154     /**
155      * Verifies that the block index is non-empty and throws an
156      * {@link IllegalStateException} otherwise.
157      */
158     public void ensureNonEmpty() {
159       if (blockKeys.length == 0) {
160         throw new IllegalStateException("Block index is empty or not loaded");
161       }
162     }
163 
164     /**
165      * Return the data block which contains this key. This function will only
166      * be called when the HFile version is larger than 1.
167      *
168      * @param key the key we are looking for
169      * @param keyOffset the offset of the key in its byte array
170      * @param keyLength the length of the key
171      * @param currentBlock the current block, to avoid re-reading the same
172      *          block
173      * @return reader a basic way to load blocks
174      * @throws IOException
175      */
176     public HFileBlock seekToDataBlock(final byte[] key, int keyOffset,
177         int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
178         boolean pread, boolean isCompaction)
179         throws IOException {
180       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, keyOffset, keyLength,
181           currentBlock, cacheBlocks, pread, isCompaction);
182       if (blockWithScanInfo == null) {
183         return null;
184       } else {
185         return blockWithScanInfo.getHFileBlock();
186       }
187     }
188 
189     /**
190      * Return the BlockWithScanInfo which contains the DataBlock with other scan info
191      * such as nextIndexedKey.
192      * This function will only be called when the HFile version is larger than 1.
193      *
194      * @param key the key we are looking for
195      * @param keyOffset the offset of the key in its byte array
196      * @param keyLength the length of the key
197      * @param currentBlock the current block, to avoid re-reading the same
198      *          block
199      * @param cacheBlocks
200      * @param pread
201      * @param isCompaction
202      * @return the BlockWithScanInfo which contains the DataBlock with other scan info
203      *         such as nextIndexedKey.
204      * @throws IOException
205      */
206     public BlockWithScanInfo loadDataBlockWithScanInfo(final byte[] key, int keyOffset,
207         int keyLength, HFileBlock currentBlock, boolean cacheBlocks,
208         boolean pread, boolean isCompaction)
209         throws IOException {
210       int rootLevelIndex = rootBlockContainingKey(key, keyOffset, keyLength);
211       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
212         return null;
213       }
214 
215       // the next indexed key
216       byte[] nextIndexedKey = null;
217 
218       // Read the next-level (intermediate or leaf) index block.
219       long currentOffset = blockOffsets[rootLevelIndex];
220       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
221 
222       if (rootLevelIndex < blockKeys.length - 1) {
223         nextIndexedKey = blockKeys[rootLevelIndex + 1];
224       } else {
225         nextIndexedKey = HConstants.NO_NEXT_INDEXED_KEY;
226       }
227 
228       int lookupLevel = 1; // How many levels deep we are in our lookup.
229       int index = -1;
230 
231       HFileBlock block;
232       while (true) {
233 
234         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
235         {
236           // Avoid reading the same block again, even with caching turned off.
237           // This is crucial for compaction-type workload which might have
238           // caching turned off. This is like a one-block cache inside the
239           // scanner.
240           block = currentBlock;
241         } else {
242           // Call HFile's caching block reader API. We always cache index
243           // blocks, otherwise we might get terrible performance.
244           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
245           BlockType expectedBlockType;
246           if (lookupLevel < searchTreeLevel - 1) {
247             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
248           } else if (lookupLevel == searchTreeLevel - 1) {
249             expectedBlockType = BlockType.LEAF_INDEX;
250           } else {
251             // this also accounts for ENCODED_DATA
252             expectedBlockType = BlockType.DATA;
253           }
254           block = cachingBlockReader.readBlock(currentOffset,
255               currentOnDiskSize, shouldCache, pread, isCompaction,
256               expectedBlockType);
257         }
258 
259         if (block == null) {
260           throw new IOException("Failed to read block at offset " +
261               currentOffset + ", onDiskSize=" + currentOnDiskSize);
262         }
263 
264         // Found a data block, break the loop and check our level in the tree.
265         if (block.getBlockType().equals(BlockType.DATA) ||
266             block.getBlockType().equals(BlockType.ENCODED_DATA)) {
267           break;
268         }
269 
270         // Not a data block. This must be a leaf-level or intermediate-level
271         // index block. We don't allow going deeper than searchTreeLevel.
272         if (++lookupLevel > searchTreeLevel) {
273           throw new IOException("Search Tree Level overflow: lookupLevel="+
274               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
275         }
276 
277         // Locate the entry corresponding to the given key in the non-root
278         // (leaf or intermediate-level) index block.
279         ByteBuffer buffer = block.getBufferWithoutHeader();
280         index = locateNonRootIndexEntry(buffer, key, keyOffset, keyLength, comparator);
281         if (index == -1) {
282           throw new IOException("The key "
283               + Bytes.toStringBinary(key, keyOffset, keyLength)
284               + " is before the" + " first key of the non-root index block "
285               + block);
286         }
287 
288         currentOffset = buffer.getLong();
289         currentOnDiskSize = buffer.getInt();
290 
291         // Only update next indexed key if there is a next indexed key in the current level
292         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
293         if (tmpNextIndexedKey != null) {
294           nextIndexedKey = tmpNextIndexedKey;
295         }
296       }
297 
298       if (lookupLevel != searchTreeLevel) {
299         throw new IOException("Reached a data block at level " + lookupLevel +
300             " but the number of levels is " + searchTreeLevel);
301       }
302 
303       // set the next indexed key for the current block.
304       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
305       return blockWithScanInfo;
306     }
307 
308     /**
309      * An approximation to the {@link HFile}'s mid-key. Operates on block
310      * boundaries, and does not go inside blocks. In other words, returns the
311      * first key of the middle block of the file.
312      *
313      * @return the first key of the middle block
314      */
315     public byte[] midkey() throws IOException {
316       if (rootCount == 0)
317         throw new IOException("HFile empty");
318 
319       byte[] midKey = this.midKey.get();
320       if (midKey != null)
321         return midKey;
322 
323       if (midLeafBlockOffset >= 0) {
324         if (cachingBlockReader == null) {
325           throw new IOException("Have to read the middle leaf block but " +
326               "no block reader available");
327         }
328 
329         // Caching, using pread, assuming this is not a compaction.
330         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
331             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false,
332             BlockType.LEAF_INDEX);
333 
334         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
335         int numDataBlocks = b.getInt();
336         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
337         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
338             keyRelOffset;
339         int keyOffset = b.arrayOffset() +
340             Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset +
341             SECONDARY_INDEX_ENTRY_OVERHEAD;
342         midKey = Arrays.copyOfRange(b.array(), keyOffset, keyOffset + keyLen);
343       } else {
344         // The middle of the root-level index.
345         midKey = blockKeys[rootCount / 2];
346       }
347 
348       this.midKey.set(midKey);
349       return midKey;
350     }
351 
352     /**
353      * @param i from 0 to {@link #getRootBlockCount() - 1}
354      */
355     public byte[] getRootBlockKey(int i) {
356       return blockKeys[i];
357     }
358 
359     /**
360      * @param i from 0 to {@link #getRootBlockCount() - 1}
361      */
362     public long getRootBlockOffset(int i) {
363       return blockOffsets[i];
364     }
365 
366     /**
367      * @param i zero-based index of a root-level block
368      * @return the on-disk size of the root-level block for version 2, or the
369      *         uncompressed size for version 1
370      */
371     public int getRootBlockDataSize(int i) {
372       return blockDataSizes[i];
373     }
374 
375     /**
376      * @return the number of root-level blocks in this block index
377      */
378     public int getRootBlockCount() {
379       return rootCount;
380     }
381 
382     /**
383      * Finds the root-level index block containing the given key.
384      *
385      * @param key
386      *          Key to find
387      * @return Offset of block containing <code>key</code> (between 0 and the
388      *         number of blocks - 1) or -1 if this file does not contain the
389      *         request.
390      */
391     public int rootBlockContainingKey(final byte[] key, int offset,
392         int length) {
393       int pos = Bytes.binarySearch(blockKeys, key, offset, length,
394           comparator);
395       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
396       // binarySearch's javadoc.
397 
398       if (pos >= 0) {
399         // This means this is an exact match with an element of blockKeys.
400         assert pos < blockKeys.length;
401         return pos;
402       }
403 
404       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
405       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
406       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
407       // key < blockKeys[0], meaning the file does not contain the given key.
408 
409       int i = -pos - 1;
410       assert 0 <= i && i <= blockKeys.length;
411       return i - 1;
412     }
413 
414     /**
415      * Adds a new entry in the root block index. Only used when reading.
416      *
417      * @param key Last key in the block
418      * @param offset file offset where the block is stored
419      * @param dataSize the uncompressed data size
420      */
421     private void add(final byte[] key, final long offset, final int dataSize) {
422       blockOffsets[rootCount] = offset;
423       blockKeys[rootCount] = key;
424       blockDataSizes[rootCount] = dataSize;
425 
426       rootCount++;
427       rootByteSize += SECONDARY_INDEX_ENTRY_OVERHEAD + key.length;
428     }
429 
430     /**
431      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
432      * @param nonRootIndex
433      * @param i the ith position
434      * @return The indexed key at the ith position in the nonRootIndex.
435      */
436     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
437       int numEntries = nonRootIndex.getInt(0);
438       if (i < 0 || i >= numEntries) {
439         return null;
440       }
441 
442       // Entries start after the number of entries and the secondary index.
443       // The secondary index takes numEntries + 1 ints.
444       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
445       // Targetkey's offset relative to the end of secondary index
446       int targetKeyRelOffset = nonRootIndex.getInt(
447           Bytes.SIZEOF_INT * (i + 1));
448 
449       // The offset of the target key in the blockIndex buffer
450       int targetKeyOffset = entriesOffset     // Skip secondary index
451           + targetKeyRelOffset               // Skip all entries until mid
452           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
453 
454       // We subtract the two consecutive secondary index elements, which
455       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
456       // then need to subtract the overhead of offset and onDiskSize.
457       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
458         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
459 
460       int from = nonRootIndex.arrayOffset() + targetKeyOffset;
461       int to = from + targetKeyLength;
462       return Arrays.copyOfRange(nonRootIndex.array(), from, to);
463     }
464 
465     /**
466      * Performs a binary search over a non-root level index block. Utilizes the
467      * secondary index, which records the offsets of (offset, onDiskSize,
468      * firstKey) tuples of all entries.
469      *
470      * @param key the key we are searching for offsets to individual entries in
471      *          the blockIndex buffer
472      * @param keyOffset the offset of the key in its byte array
473      * @param keyLength the length of the key
474      * @param nonRootIndex the non-root index block buffer, starting with the
475      *          secondary index. The position is ignored.
476      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
477      *         keys[i + 1], if keys is the array of all keys being searched, or
478      *         -1 otherwise
479      * @throws IOException
480      */
481     static int binarySearchNonRootIndex(byte[] key, int keyOffset,
482         int keyLength, ByteBuffer nonRootIndex,
483         RawComparator<byte[]> comparator) {
484 
485       int numEntries = nonRootIndex.getInt(0);
486       int low = 0;
487       int high = numEntries - 1;
488       int mid = 0;
489 
490       // Entries start after the number of entries and the secondary index.
491       // The secondary index takes numEntries + 1 ints.
492       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
493 
494       // If we imagine that keys[-1] = -Infinity and
495       // keys[numEntries] = Infinity, then we are maintaining an invariant that
496       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
497 
498       while (low <= high) {
499         mid = (low + high) >>> 1;
500 
501         // Midkey's offset relative to the end of secondary index
502         int midKeyRelOffset = nonRootIndex.getInt(
503             Bytes.SIZEOF_INT * (mid + 1));
504 
505         // The offset of the middle key in the blockIndex buffer
506         int midKeyOffset = entriesOffset       // Skip secondary index
507             + midKeyRelOffset                  // Skip all entries until mid
508             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
509 
510         // We subtract the two consecutive secondary index elements, which
511         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
512         // then need to subtract the overhead of offset and onDiskSize.
513         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
514             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
515 
516         // we have to compare in this order, because the comparator order
517         // has special logic when the 'left side' is a special key.
518         int cmp = comparator.compare(key, keyOffset, keyLength,
519             nonRootIndex.array(), nonRootIndex.arrayOffset() + midKeyOffset,
520             midLength);
521 
522         // key lives above the midpoint
523         if (cmp > 0)
524           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
525         // key lives below the midpoint
526         else if (cmp < 0)
527           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
528         else
529           return mid; // exact match
530       }
531 
532       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
533       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
534       // condition, low >= high + 1. Therefore, low = high + 1.
535 
536       if (low != high + 1) {
537         throw new IllegalStateException("Binary search broken: low=" + low
538             + " " + "instead of " + (high + 1));
539       }
540 
541       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
542       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
543       int i = low - 1;
544 
545       // Some extra validation on the result.
546       if (i < -1 || i >= numEntries) {
547         throw new IllegalStateException("Binary search broken: result is " +
548             i + " but expected to be between -1 and (numEntries - 1) = " +
549             (numEntries - 1));
550       }
551 
552       return i;
553     }
554 
555     /**
556      * Search for one key using the secondary index in a non-root block. In case
557      * of success, positions the provided buffer at the entry of interest, where
558      * the file offset and the on-disk-size can be read.
559      *
560      * @param nonRootBlock a non-root block without header. Initial position
561      *          does not matter.
562      * @param key the byte array containing the key
563      * @param keyOffset the offset of the key in its byte array
564      * @param keyLength the length of the key
565      * @return the index position where the given key was found,
566      *         otherwise return -1 in the case the given key is before the first key.
567      *
568      */
569     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, byte[] key,
570         int keyOffset, int keyLength, RawComparator<byte[]> comparator) {
571       int entryIndex = binarySearchNonRootIndex(key, keyOffset, keyLength,
572           nonRootBlock, comparator);
573 
574       if (entryIndex != -1) {
575         int numEntries = nonRootBlock.getInt(0);
576 
577         // The end of secondary index and the beginning of entries themselves.
578         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
579 
580         // The offset of the entry we are interested in relative to the end of
581         // the secondary index.
582         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT
583             * (1 + entryIndex));
584 
585         nonRootBlock.position(entriesOffset + entryRelOffset);
586       }
587 
588       return entryIndex;
589     }
590 
591     /**
592      * Read in the root-level index from the given input stream. Must match
593      * what was written into the root level by
594      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
595      * offset that function returned.
596      *
597      * @param in the buffered input stream or wrapped byte input stream
598      * @param numEntries the number of root-level index entries
599      * @throws IOException
600      */
601     public void readRootIndex(DataInput in, final int numEntries)
602         throws IOException {
603       blockOffsets = new long[numEntries];
604       blockKeys = new byte[numEntries][];
605       blockDataSizes = new int[numEntries];
606 
607       // If index size is zero, no index was written.
608       if (numEntries > 0) {
609         for (int i = 0; i < numEntries; ++i) {
610           long offset = in.readLong();
611           int dataSize = in.readInt();
612           byte[] key = Bytes.readByteArray(in);
613           add(key, offset, dataSize);
614         }
615       }
616     }
617     
618     /**
619      * Read in the root-level index from the given input stream. Must match
620      * what was written into the root level by
621      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
622      * offset that function returned.
623      *
624      * @param blk the HFile block
625      * @param numEntries the number of root-level index entries
626      * @return the buffered input stream or wrapped byte input stream
627      * @throws IOException
628      */
629     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
630       DataInputStream in = blk.getByteStream();
631       readRootIndex(in, numEntries);
632       return in;
633     }
634 
635     /**
636      * Read the root-level metadata of a multi-level block index. Based on
637      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
638      * necessary to compute the mid-key in a multi-level index.
639      *
640      * @param blk the HFile block
641      * @param numEntries the number of root-level index entries
642      * @throws IOException
643      */
644     public void readMultiLevelIndexRoot(HFileBlock blk,
645         final int numEntries) throws IOException {
646       DataInputStream in = readRootIndex(blk, numEntries);
647       // after reading the root index the checksum bytes have to
648       // be subtracted to know if the mid key exists.
649       int checkSumBytes = blk.totalChecksumBytes();
650       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
651         // No mid-key metadata available.
652         return;
653       }
654       midLeafBlockOffset = in.readLong();
655       midLeafBlockOnDiskSize = in.readInt();
656       midKeyEntry = in.readInt();
657     }
658 
659     @Override
660     public String toString() {
661       StringBuilder sb = new StringBuilder();
662       sb.append("size=" + rootCount).append("\n");
663       for (int i = 0; i < rootCount; i++) {
664         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
665             .append("\n  offset=").append(blockOffsets[i])
666             .append(", dataSize=" + blockDataSizes[i]).append("\n");
667       }
668       return sb.toString();
669     }
670 
671     @Override
672     public long heapSize() {
673       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
674           3 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
675 
676       // Mid-key metadata.
677       heapSize += MID_KEY_METADATA_SIZE;
678 
679       // Calculating the size of blockKeys
680       if (blockKeys != null) {
681         // Adding array + references overhead
682         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
683             * ClassSize.REFERENCE);
684 
685         // Adding bytes
686         for (byte[] key : blockKeys) {
687           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
688         }
689       }
690 
691       if (blockOffsets != null) {
692         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
693             * Bytes.SIZEOF_LONG);
694       }
695 
696       if (blockDataSizes != null) {
697         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
698             * Bytes.SIZEOF_INT);
699       }
700 
701       return ClassSize.align(heapSize);
702     }
703 
704   }
705 
706   /**
707    * Writes the block index into the output stream. Generate the tree from
708    * bottom up. The leaf level is written to disk as a sequence of inline
709    * blocks, if it is larger than a certain number of bytes. If the leaf level
710    * is not large enough, we write all entries to the root level instead.
711    *
712    * After all leaf blocks have been written, we end up with an index
713    * referencing the resulting leaf index blocks. If that index is larger than
714    * the allowed root index size, the writer will break it up into
715    * reasonable-size intermediate-level index block chunks write those chunks
716    * out, and create another index referencing those chunks. This will be
717    * repeated until the remaining index is small enough to become the root
718    * index. However, in most practical cases we will only have leaf-level
719    * blocks and the root index, or just the root index.
720    */
721   public static class BlockIndexWriter extends SchemaConfigured
722       implements InlineBlockWriter {
723     /**
724      * While the index is being written, this represents the current block
725      * index referencing all leaf blocks, with one exception. If the file is
726      * being closed and there are not enough blocks to complete even a single
727      * leaf block, no leaf blocks get written and this contains the entire
728      * block index. After all levels of the index were written by
729      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
730      * root-level index.
731      */
732     private BlockIndexChunk rootChunk = new BlockIndexChunk();
733 
734     /**
735      * Current leaf-level chunk. New entries referencing data blocks get added
736      * to this chunk until it grows large enough to be written to disk.
737      */
738     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
739 
740     /**
741      * The number of block index levels. This is one if there is only root
742      * level (even empty), two if there a leaf level and root level, and is
743      * higher if there are intermediate levels. This is only final after
744      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
745      * initial value accounts for the root level, and will be increased to two
746      * as soon as we find out there is a leaf-level in
747      * {@link #blockWritten(long, int)}.
748      */
749     private int numLevels = 1;
750 
751     private HFileBlock.Writer blockWriter;
752     private byte[] firstKey = null;
753 
754     /**
755      * The total number of leaf-level entries, i.e. entries referenced by
756      * leaf-level blocks. For the data block index this is equal to the number
757      * of data blocks.
758      */
759     private long totalNumEntries;
760 
761     /** Total compressed size of all index blocks. */
762     private long totalBlockOnDiskSize;
763 
764     /** Total uncompressed size of all index blocks. */
765     private long totalBlockUncompressedSize;
766 
767     /** The maximum size guideline of all multi-level index blocks. */
768     private int maxChunkSize;
769 
770     /** Whether we require this block index to always be single-level. */
771     private boolean singleLevelOnly;
772 
773     /** Block cache, or null if cache-on-write is disabled */
774     private BlockCache blockCache;
775 
776     /** Name to use for computing cache keys */
777     private String nameForCaching;
778 
779     /** Creates a single-level block index writer */
780     public BlockIndexWriter() {
781       this(null, null, null);
782       singleLevelOnly = true;
783     }
784 
785     /**
786      * Creates a multi-level block index writer.
787      *
788      * @param blockWriter the block writer to use to write index blocks
789      * @param blockCache if this is not null, index blocks will be cached
790      *    on write into this block cache.
791      */
792     public BlockIndexWriter(HFileBlock.Writer blockWriter,
793         BlockCache blockCache, String nameForCaching) {
794       if ((blockCache == null) != (nameForCaching == null)) {
795         throw new IllegalArgumentException("Block cache and file name for " +
796             "caching must be both specified or both null");
797       }
798 
799       this.blockWriter = blockWriter;
800       this.blockCache = blockCache;
801       this.nameForCaching = nameForCaching;
802       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
803     }
804 
805     public void setMaxChunkSize(int maxChunkSize) {
806       if (maxChunkSize <= 0) {
807         throw new IllegalArgumentException("Invald maximum index block size");
808       }
809       this.maxChunkSize = maxChunkSize;
810     }
811 
812     /**
813      * Writes the root level and intermediate levels of the block index into
814      * the output stream, generating the tree from bottom up. Assumes that the
815      * leaf level has been inline-written to the disk if there is enough data
816      * for more than one leaf block. We iterate by breaking the current level
817      * of the block index, starting with the index of all leaf-level blocks,
818      * into chunks small enough to be written to disk, and generate its parent
819      * level, until we end up with a level small enough to become the root
820      * level.
821      *
822      * If the leaf level is not large enough, there is no inline block index
823      * anymore, so we only write that level of block index to disk as the root
824      * level.
825      *
826      * @param out FSDataOutputStream
827      * @return position at which we entered the root-level index.
828      * @throws IOException
829      */
830     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
831       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
832         throw new IOException("Trying to write a multi-level block index, " +
833             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
834             "last inline chunk.");
835       }
836 
837       // We need to get mid-key metadata before we create intermediate
838       // indexes and overwrite the root chunk.
839       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
840           : null;
841 
842       if (curInlineChunk != null) {
843         while (rootChunk.getRootSize() > maxChunkSize) {
844           rootChunk = writeIntermediateLevel(out, rootChunk);
845           numLevels += 1;
846         }
847       }
848 
849       // write the root level
850       long rootLevelIndexPos = out.getPos();
851 
852       {
853         DataOutput blockStream =
854             blockWriter.startWriting(BlockType.ROOT_INDEX);
855         rootChunk.writeRoot(blockStream);
856         if (midKeyMetadata != null)
857           blockStream.write(midKeyMetadata);
858         blockWriter.writeHeaderAndData(out);
859       }
860 
861       // Add root index block size
862       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
863       totalBlockUncompressedSize +=
864           blockWriter.getUncompressedSizeWithoutHeader();
865 
866       if (LOG.isTraceEnabled()) {
867         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
868           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
869           + " root-level entries, " + totalNumEntries + " total entries, "
870           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
871           " on-disk size, "
872           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
873           " total uncompressed size.");
874       }
875       return rootLevelIndexPos;
876     }
877 
878     /**
879      * Writes the block index data as a single level only. Does not do any
880      * block framing.
881      *
882      * @param out the buffered output stream to write the index to. Typically a
883      *          stream writing into an {@link HFile} block.
884      * @param description a short description of the index being written. Used
885      *          in a log message.
886      * @throws IOException
887      */
888     public void writeSingleLevelIndex(DataOutput out, String description)
889         throws IOException {
890       expectNumLevels(1);
891 
892       if (!singleLevelOnly)
893         throw new IOException("Single-level mode is turned off");
894 
895       if (rootChunk.getNumEntries() > 0)
896         throw new IOException("Root-level entries already added in " +
897             "single-level mode");
898 
899       rootChunk = curInlineChunk;
900       curInlineChunk = new BlockIndexChunk();
901 
902       if (LOG.isTraceEnabled()) {
903         LOG.trace("Wrote a single-level " + description + " index with "
904           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
905           + " bytes");
906       }
907       rootChunk.writeRoot(out);
908     }
909 
910     /**
911      * Split the current level of the block index into intermediate index
912      * blocks of permitted size and write those blocks to disk. Return the next
913      * level of the block index referencing those intermediate-level blocks.
914      *
915      * @param out
916      * @param currentLevel the current level of the block index, such as the a
917      *          chunk referencing all leaf-level index blocks
918      * @return the parent level block index, which becomes the root index after
919      *         a few (usually zero) iterations
920      * @throws IOException
921      */
922     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
923         BlockIndexChunk currentLevel) throws IOException {
924       // Entries referencing intermediate-level blocks we are about to create.
925       BlockIndexChunk parent = new BlockIndexChunk();
926 
927       // The current intermediate-level block index chunk.
928       BlockIndexChunk curChunk = new BlockIndexChunk();
929 
930       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
931         curChunk.add(currentLevel.getBlockKey(i),
932             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
933 
934         if (curChunk.getRootSize() >= maxChunkSize)
935           writeIntermediateBlock(out, parent, curChunk);
936       }
937 
938       if (curChunk.getNumEntries() > 0) {
939         writeIntermediateBlock(out, parent, curChunk);
940       }
941 
942       return parent;
943     }
944 
945     private void writeIntermediateBlock(FSDataOutputStream out,
946         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
947       long beginOffset = out.getPos();
948       DataOutputStream dos = blockWriter.startWriting(
949           BlockType.INTERMEDIATE_INDEX);
950       curChunk.writeNonRoot(dos);
951       byte[] curFirstKey = curChunk.getBlockKey(0);
952       blockWriter.writeHeaderAndData(out);
953 
954       if (blockCache != null) {
955         HFileBlock blockForCaching = blockWriter.getBlockForCaching();
956         passSchemaMetricsTo(blockForCaching);
957         blockCache.cacheBlock(new BlockCacheKey(nameForCaching,
958             beginOffset, DataBlockEncoding.NONE, 
959             blockForCaching.getBlockType()), blockForCaching);
960       }
961 
962       // Add intermediate index block size
963       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
964       totalBlockUncompressedSize +=
965           blockWriter.getUncompressedSizeWithoutHeader();
966 
967       // OFFSET is the beginning offset the chunk of block index entries.
968       // SIZE is the total byte size of the chunk of block index entries
969       // + the secondary index size
970       // FIRST_KEY is the first key in the chunk of block index
971       // entries.
972       parent.add(curFirstKey, beginOffset,
973           blockWriter.getOnDiskSizeWithHeader());
974 
975       // clear current block index chunk
976       curChunk.clear();
977       curFirstKey = null;
978     }
979 
980     /**
981      * @return how many block index entries there are in the root level
982      */
983     public final int getNumRootEntries() {
984       return rootChunk.getNumEntries();
985     }
986 
987     /**
988      * @return the number of levels in this block index.
989      */
990     public int getNumLevels() {
991       return numLevels;
992     }
993 
994     private void expectNumLevels(int expectedNumLevels) {
995       if (numLevels != expectedNumLevels) {
996         throw new IllegalStateException("Number of block index levels is "
997             + numLevels + "but is expected to be " + expectedNumLevels);
998       }
999     }
1000 
1001     /**
1002      * Whether there is an inline block ready to be written. In general, we
1003      * write an leaf-level index block as an inline block as soon as its size
1004      * as serialized in the non-root format reaches a certain threshold.
1005      */
1006     @Override
1007     public boolean shouldWriteBlock(boolean closing) {
1008       if (singleLevelOnly) {
1009         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1010       }
1011 
1012       if (curInlineChunk == null) {
1013         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1014             "called with closing=true and then called again?");
1015       }
1016 
1017       if (curInlineChunk.getNumEntries() == 0) {
1018         return false;
1019       }
1020 
1021       // We do have some entries in the current inline chunk.
1022       if (closing) {
1023         if (rootChunk.getNumEntries() == 0) {
1024           // We did not add any leaf-level blocks yet. Instead of creating a
1025           // leaf level with one block, move these entries to the root level.
1026 
1027           expectNumLevels(1);
1028           rootChunk = curInlineChunk;
1029           curInlineChunk = null;  // Disallow adding any more index entries.
1030           return false;
1031         }
1032 
1033         return true;
1034       } else {
1035         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1036       }
1037     }
1038 
1039     /**
1040      * Write out the current inline index block. Inline blocks are non-root
1041      * blocks, so the non-root index format is used.
1042      *
1043      * @param out
1044      */
1045     @Override
1046     public void writeInlineBlock(DataOutput out) throws IOException {
1047       if (singleLevelOnly)
1048         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1049 
1050       // Write the inline block index to the output stream in the non-root
1051       // index block format.
1052       curInlineChunk.writeNonRoot(out);
1053 
1054       // Save the first key of the inline block so that we can add it to the
1055       // parent-level index.
1056       firstKey = curInlineChunk.getBlockKey(0);
1057 
1058       // Start a new inline index block
1059       curInlineChunk.clear();
1060     }
1061 
1062     /**
1063      * Called after an inline block has been written so that we can add an
1064      * entry referring to that block to the parent-level index.
1065      */
1066     @Override
1067     public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
1068     {
1069       // Add leaf index block size
1070       totalBlockOnDiskSize += onDiskSize;
1071       totalBlockUncompressedSize += uncompressedSize;
1072 
1073       if (singleLevelOnly)
1074         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1075 
1076       if (firstKey == null) {
1077         throw new IllegalStateException("Trying to add second-level index " +
1078             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1079             "but the first key was not set in writeInlineBlock");
1080       }
1081 
1082       if (rootChunk.getNumEntries() == 0) {
1083         // We are writing the first leaf block, so increase index level.
1084         expectNumLevels(1);
1085         numLevels = 2;
1086       }
1087 
1088       // Add another entry to the second-level index. Include the number of
1089       // entries in all previous leaf-level chunks for mid-key calculation.
1090       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1091       firstKey = null;
1092     }
1093 
1094     @Override
1095     public BlockType getInlineBlockType() {
1096       return BlockType.LEAF_INDEX;
1097     }
1098 
1099     /**
1100      * Add one index entry to the current leaf-level block. When the leaf-level
1101      * block gets large enough, it will be flushed to disk as an inline block.
1102      *
1103      * @param firstKey the first key of the data block
1104      * @param blockOffset the offset of the data block
1105      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1106      *          format version 2), or the uncompressed size of the data block (
1107      *          {@link HFile} format version 1).
1108      */
1109     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize)
1110     {
1111       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1112       ++totalNumEntries;
1113     }
1114 
1115     /**
1116      * @throws IOException if we happened to write a multi-level index.
1117      */
1118     public void ensureSingleLevel() throws IOException {
1119       if (numLevels > 1) {
1120         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1121             rootChunk.getNumEntries() + " root-level entries, but " +
1122             "this is expected to be a single-level block index.");
1123       }
1124     }
1125 
1126     /**
1127      * @return true if we are using cache-on-write. This is configured by the
1128      *         caller of the constructor by either passing a valid block cache
1129      *         or null.
1130      */
1131     @Override
1132     public boolean cacheOnWrite() {
1133       return blockCache != null;
1134     }
1135 
1136     /**
1137      * The total uncompressed size of the root index block, intermediate-level
1138      * index blocks, and leaf-level index blocks.
1139      *
1140      * @return the total uncompressed size of all index blocks
1141      */
1142     public long getTotalUncompressedSize() {
1143       return totalBlockUncompressedSize;
1144     }
1145 
1146   }
1147 
1148   /**
1149    * A single chunk of the block index in the process of writing. The data in
1150    * this chunk can become a leaf-level, intermediate-level, or root index
1151    * block.
1152    */
1153   static class BlockIndexChunk {
1154 
1155     /** First keys of the key range corresponding to each index entry. */
1156     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1157 
1158     /** Block offset in backing stream. */
1159     private final List<Long> blockOffsets = new ArrayList<Long>();
1160 
1161     /** On-disk data sizes of lower-level data or index blocks. */
1162     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1163 
1164     /**
1165      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1166      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1167      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1168      */
1169     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1170 
1171     /**
1172      * The offset of the next entry to be added, relative to the end of the
1173      * "secondary index" in the "non-root" format representation of this index
1174      * chunk. This is the next value to be added to the secondary index.
1175      */
1176     private int curTotalNonRootEntrySize = 0;
1177 
1178     /**
1179      * The accumulated size of this chunk if stored in the root index format.
1180      */
1181     private int curTotalRootSize = 0;
1182 
1183     /**
1184      * The "secondary index" used for binary search over variable-length
1185      * records in a "non-root" format block. These offsets are relative to the
1186      * end of this secondary index.
1187      */
1188     private final List<Integer> secondaryIndexOffsetMarks =
1189         new ArrayList<Integer>();
1190 
1191     /**
1192      * Adds a new entry to this block index chunk.
1193      *
1194      * @param firstKey the first key in the block pointed to by this entry
1195      * @param blockOffset the offset of the next-level block pointed to by this
1196      *          entry
1197      * @param onDiskDataSize the on-disk data of the block pointed to by this
1198      *          entry, including header size
1199      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1200      *          construction, this specifies the current total number of
1201      *          sub-entries in all leaf-level chunks, including the one
1202      *          corresponding to the second-level entry being added.
1203      */
1204     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1205         long curTotalNumSubEntries) {
1206       // Record the offset for the secondary index
1207       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1208       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1209           + firstKey.length;
1210 
1211       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1212           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1213 
1214       blockKeys.add(firstKey);
1215       blockOffsets.add(blockOffset);
1216       onDiskDataSizes.add(onDiskDataSize);
1217 
1218       if (curTotalNumSubEntries != -1) {
1219         numSubEntriesAt.add(curTotalNumSubEntries);
1220 
1221         // Make sure the parallel arrays are in sync.
1222         if (numSubEntriesAt.size() != blockKeys.size()) {
1223           throw new IllegalStateException("Only have key/value count " +
1224               "stats for " + numSubEntriesAt.size() + " block index " +
1225               "entries out of " + blockKeys.size());
1226         }
1227       }
1228     }
1229 
1230     /**
1231      * The same as {@link #add(byte[], long, int, long)} but does not take the
1232      * key/value into account. Used for single-level indexes.
1233      *
1234      * @see {@link #add(byte[], long, int, long)}
1235      */
1236     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1237       add(firstKey, blockOffset, onDiskDataSize, -1);
1238     }
1239 
1240     public void clear() {
1241       blockKeys.clear();
1242       blockOffsets.clear();
1243       onDiskDataSizes.clear();
1244       secondaryIndexOffsetMarks.clear();
1245       numSubEntriesAt.clear();
1246       curTotalNonRootEntrySize = 0;
1247       curTotalRootSize = 0;
1248     }
1249 
1250     /**
1251      * Finds the entry corresponding to the deeper-level index block containing
1252      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1253      * ordering of sub-entries.
1254      *
1255      * <p>
1256      * <i> Implementation note. </i> We are looking for i such that
1257      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1258      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1259      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1260      * sub-entries. i is by definition the insertion point of k in
1261      * numSubEntriesAt.
1262      *
1263      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1264      * @return the 0-based index of the entry corresponding to the given
1265      *         sub-entry
1266      */
1267     public int getEntryBySubEntry(long k) {
1268       // We define mid-key as the key corresponding to k'th sub-entry
1269       // (0-based).
1270 
1271       int i = Collections.binarySearch(numSubEntriesAt, k);
1272 
1273       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1274       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1275       // is in the (i + 1)'th chunk.
1276       if (i >= 0)
1277         return i + 1;
1278 
1279       // Inexact match. Return the insertion point.
1280       return -i - 1;
1281     }
1282 
1283     /**
1284      * Used when writing the root block index of a multi-level block index.
1285      * Serializes additional information allowing to efficiently identify the
1286      * mid-key.
1287      *
1288      * @return a few serialized fields for finding the mid-key
1289      * @throws IOException if could not create metadata for computing mid-key
1290      */
1291     public byte[] getMidKeyMetadata() throws IOException {
1292       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1293           MID_KEY_METADATA_SIZE);
1294       DataOutputStream baosDos = new DataOutputStream(baos);
1295       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1296       if (totalNumSubEntries == 0) {
1297         throw new IOException("No leaf-level entries, mid-key unavailable");
1298       }
1299       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1300       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1301 
1302       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1303       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1304 
1305       long numSubEntriesBefore = midKeyEntry > 0
1306           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1307       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1308       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1309       {
1310         throw new IOException("Could not identify mid-key index within the "
1311             + "leaf-level block containing mid-key: out of range ("
1312             + subEntryWithinEntry + ", numSubEntriesBefore="
1313             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1314             + ")");
1315       }
1316 
1317       baosDos.writeInt((int) subEntryWithinEntry);
1318 
1319       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1320         throw new IOException("Could not write mid-key metadata: size=" +
1321             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1322       }
1323 
1324       // Close just to be good citizens, although this has no effect.
1325       baos.close();
1326 
1327       return baos.toByteArray();
1328     }
1329 
1330     /**
1331      * Writes the block index chunk in the non-root index block format. This
1332      * format contains the number of entries, an index of integer offsets
1333      * for quick binary search on variable-length records, and tuples of
1334      * block offset, on-disk block size, and the first key for each entry.
1335      *
1336      * @param out
1337      * @throws IOException
1338      */
1339     void writeNonRoot(DataOutput out) throws IOException {
1340       // The number of entries in the block.
1341       out.writeInt(blockKeys.size());
1342 
1343       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1344         throw new IOException("Corrupted block index chunk writer: " +
1345             blockKeys.size() + " entries but " +
1346             secondaryIndexOffsetMarks.size() + " secondary index items");
1347       }
1348 
1349       // For each entry, write a "secondary index" of relative offsets to the
1350       // entries from the end of the secondary index. This works, because at
1351       // read time we read the number of entries and know where the secondary
1352       // index ends.
1353       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1354         out.writeInt(currentSecondaryIndex);
1355 
1356       // We include one other element in the secondary index to calculate the
1357       // size of each entry more easily by subtracting secondary index elements.
1358       out.writeInt(curTotalNonRootEntrySize);
1359 
1360       for (int i = 0; i < blockKeys.size(); ++i) {
1361         out.writeLong(blockOffsets.get(i));
1362         out.writeInt(onDiskDataSizes.get(i));
1363         out.write(blockKeys.get(i));
1364       }
1365     }
1366 
1367     /**
1368      * @return the size of this chunk if stored in the non-root index block
1369      *         format
1370      */
1371     int getNonRootSize() {
1372       return Bytes.SIZEOF_INT                          // Number of entries
1373           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1374           + curTotalNonRootEntrySize;                  // All entries
1375     }
1376 
1377     /**
1378      * Writes this chunk into the given output stream in the root block index
1379      * format. This format is similar to the {@link HFile} version 1 block
1380      * index format, except that we store on-disk size of the block instead of
1381      * its uncompressed size.
1382      *
1383      * @param out the data output stream to write the block index to. Typically
1384      *          a stream writing into an {@link HFile} block.
1385      * @throws IOException
1386      */
1387     void writeRoot(DataOutput out) throws IOException {
1388       for (int i = 0; i < blockKeys.size(); ++i) {
1389         out.writeLong(blockOffsets.get(i));
1390         out.writeInt(onDiskDataSizes.get(i));
1391         Bytes.writeByteArray(out, blockKeys.get(i));
1392       }
1393     }
1394 
1395     /**
1396      * @return the size of this chunk if stored in the root index block format
1397      */
1398     int getRootSize() {
1399       return curTotalRootSize;
1400     }
1401 
1402     /**
1403      * @return the number of entries in this block index chunk
1404      */
1405     public int getNumEntries() {
1406       return blockKeys.size();
1407     }
1408 
1409     public byte[] getBlockKey(int i) {
1410       return blockKeys.get(i);
1411     }
1412 
1413     public long getBlockOffset(int i) {
1414       return blockOffsets.get(i);
1415     }
1416 
1417     public int getOnDiskDataSize(int i) {
1418       return onDiskDataSizes.get(i);
1419     }
1420 
1421     public long getCumulativeNumKV(int i) {
1422       if (i < 0)
1423         return 0;
1424       return numSubEntriesAt.get(i);
1425     }
1426 
1427   }
1428 
1429   public static int getMaxChunkSize(Configuration conf) {
1430     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1431   }
1432 }