View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
29  import org.apache.hadoop.hbase.io.TagCompressionContext;
30  import org.apache.hadoop.hbase.io.hfile.BlockType;
31  import org.apache.hadoop.hbase.io.hfile.HFileContext;
32  import org.apache.hadoop.hbase.io.util.LRUDictionary;
33  import org.apache.hadoop.hbase.util.ByteBufferUtils;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.io.WritableUtils;
36  
37  /**
38   * Base class for all data block encoders that use a buffer.
39   */
40  @InterfaceAudience.Private
41  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
42  
43    private static int INITIAL_KEY_BUFFER_SIZE = 512;
44  
45    @Override
46    public ByteBuffer decodeKeyValues(DataInputStream source,
47        HFileBlockDecodingContext blkDecodingCtx) throws IOException {
48      if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
49        throw new IOException(this.getClass().getName() + " only accepts "
50            + HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
51      }
52  
53      HFileBlockDefaultDecodingContext decodingCtx =
54          (HFileBlockDefaultDecodingContext) blkDecodingCtx;
55      if (decodingCtx.getHFileContext().isIncludesTags()
56          && decodingCtx.getHFileContext().isCompressTags()) {
57        if (decodingCtx.getTagCompressionContext() != null) {
58          // It will be overhead to create the TagCompressionContext again and again for every block
59          // decoding.
60          decodingCtx.getTagCompressionContext().clear();
61        } else {
62          try {
63            TagCompressionContext tagCompressionContext = new TagCompressionContext(
64                LRUDictionary.class, Byte.MAX_VALUE);
65            decodingCtx.setTagCompressionContext(tagCompressionContext);
66          } catch (Exception e) {
67            throw new IOException("Failed to initialize TagCompressionContext", e);
68          }
69        }
70      }
71      return internalDecodeKeyValues(source, 0, 0, decodingCtx);
72    }
73  
74    protected static class SeekerState {
75      protected int valueOffset = -1;
76      protected int keyLength;
77      protected int valueLength;
78      protected int lastCommonPrefix;
79      protected int tagsLength = 0;
80      protected int tagsOffset = -1;
81      protected int tagsCompressedLength = 0;
82      protected boolean uncompressTags = true;
83  
84      /** We need to store a copy of the key. */
85      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
86      protected byte[] tagsBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
87  
88      protected long memstoreTS;
89      protected int nextKvOffset;
90  
91      protected boolean isValid() {
92        return valueOffset != -1;
93      }
94  
95      protected void invalidate() {
96        valueOffset = -1;
97        tagsCompressedLength = 0;
98        uncompressTags = true;
99      }
100 
101     protected void ensureSpaceForKey() {
102       if (keyLength > keyBuffer.length) {
103         // rare case, but we need to handle arbitrary length of key
104         int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
105         while (keyLength > newKeyBufferLength) {
106           newKeyBufferLength *= 2;
107         }
108         byte[] newKeyBuffer = new byte[newKeyBufferLength];
109         System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
110         keyBuffer = newKeyBuffer;
111       }
112     }
113 
114     protected void ensureSpaceForTags() {
115       if (tagsLength > tagsBuffer.length) {
116         // rare case, but we need to handle arbitrary length of tags
117         int newTagsBufferLength = Math.max(tagsBuffer.length, 1) * 2;
118         while (tagsLength > newTagsBufferLength) {
119           newTagsBufferLength *= 2;
120         }
121         byte[] newTagsBuffer = new byte[newTagsBufferLength];
122         System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
123         tagsBuffer = newTagsBuffer;
124       }
125     }
126 
127     /**
128      * Copy the state from the next one into this instance (the previous state
129      * placeholder). Used to save the previous state when we are advancing the
130      * seeker to the next key/value.
131      */
132     protected void copyFromNext(SeekerState nextState) {
133       if (keyBuffer.length != nextState.keyBuffer.length) {
134         keyBuffer = nextState.keyBuffer.clone();
135       } else if (!isValid()) {
136         // Note: we can only call isValid before we override our state, so this
137         // comes before all the assignments at the end of this method.
138         System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
139              nextState.keyLength);
140       } else {
141         // don't copy the common prefix between this key and the previous one
142         System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
143             keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
144                 - nextState.lastCommonPrefix);
145       }
146 
147       valueOffset = nextState.valueOffset;
148       keyLength = nextState.keyLength;
149       valueLength = nextState.valueLength;
150       lastCommonPrefix = nextState.lastCommonPrefix;
151       nextKvOffset = nextState.nextKvOffset;
152       memstoreTS = nextState.memstoreTS;
153     }
154 
155   }
156 
157   protected abstract static class
158       BufferedEncodedSeeker<STATE extends SeekerState>
159       implements EncodedSeeker {
160     protected HFileBlockDecodingContext decodingCtx;
161     protected final KVComparator comparator;
162     protected final SamePrefixComparator<byte[]> samePrefixComparator;
163     protected ByteBuffer currentBuffer;
164     protected STATE current = createSeekerState(); // always valid
165     protected STATE previous = createSeekerState(); // may not be valid
166     protected TagCompressionContext tagCompressionContext = null;
167 
168     public BufferedEncodedSeeker(KVComparator comparator,
169         HFileBlockDecodingContext decodingCtx) {
170       this.comparator = comparator;
171       this.samePrefixComparator = comparator;
172       this.decodingCtx = decodingCtx;
173       if (decodingCtx.getHFileContext().isCompressTags()) {
174         try {
175           tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
176         } catch (Exception e) {
177           throw new RuntimeException("Failed to initialize TagCompressionContext", e);
178         }
179       }
180     }
181     
182     protected boolean includesMvcc() {
183       return this.decodingCtx.getHFileContext().isIncludesMvcc();
184     }
185 
186     protected boolean includesTags() {
187       return this.decodingCtx.getHFileContext().isIncludesTags();
188     }
189 
190     @Override
191     public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
192       return comparator.compareFlatKey(key, offset, length,
193           current.keyBuffer, 0, current.keyLength);
194     }
195 
196     @Override
197     public void setCurrentBuffer(ByteBuffer buffer) {
198       if (this.tagCompressionContext != null) {
199         this.tagCompressionContext.clear();
200       }
201       currentBuffer = buffer;
202       decodeFirst();
203       previous.invalidate();
204     }
205 
206     @Override
207     public ByteBuffer getKeyDeepCopy() {
208       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
209       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
210       return keyBuffer;
211     }
212 
213     @Override
214     public ByteBuffer getValueShallowCopy() {
215       return ByteBuffer.wrap(currentBuffer.array(),
216           currentBuffer.arrayOffset() + current.valueOffset,
217           current.valueLength);
218     }
219 
220     @Override
221     public ByteBuffer getKeyValueBuffer() {
222       ByteBuffer kvBuffer = createKVBuffer();
223       kvBuffer.putInt(current.keyLength);
224       kvBuffer.putInt(current.valueLength);
225       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
226       kvBuffer.put(currentBuffer.array(),
227           currentBuffer.arrayOffset() + current.valueOffset,
228           current.valueLength);
229       if (current.tagsLength > 0) {
230         kvBuffer.putShort((short) current.tagsLength);
231         if (current.tagsOffset != -1) {
232           // the offset of the tags bytes in the underlying buffer is marked. So the temp
233           // buffer,tagsBuffer was not been used.
234           kvBuffer.put(currentBuffer.array(), currentBuffer.arrayOffset() + current.tagsOffset,
235               current.tagsLength);
236         } else {
237           // When tagsOffset is marked as -1, tag compression was present and so the tags were
238           // uncompressed into temp buffer, tagsBuffer. Let us copy it from there
239           kvBuffer.put(current.tagsBuffer, 0, current.tagsLength);
240         }
241       }
242       return kvBuffer;
243     }
244 
245     protected ByteBuffer createKVBuffer() {
246       int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(current.keyLength,
247           current.valueLength, current.tagsLength);
248       ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
249       return kvBuffer;
250     }
251 
252     @Override
253     public KeyValue getKeyValue() {
254       ByteBuffer kvBuf = getKeyValueBuffer();
255       KeyValue kv = new KeyValue(kvBuf.array(), kvBuf.arrayOffset(), kvBuf.array().length
256           - kvBuf.arrayOffset());
257       kv.setMvccVersion(current.memstoreTS);
258       return kv;
259     }
260 
261     @Override
262     public void rewind() {
263       currentBuffer.rewind();
264       if (tagCompressionContext != null) {
265         tagCompressionContext.clear();
266       }
267       decodeFirst();
268       previous.invalidate();
269     }
270 
271     @Override
272     public boolean next() {
273       if (!currentBuffer.hasRemaining()) {
274         return false;
275       }
276       decodeNext();
277       previous.invalidate();
278       return true;
279     }
280 
281     protected void decodeTags() {
282       current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
283       if (tagCompressionContext != null) {
284         if (current.uncompressTags) {
285           // Tag compression is been used. uncompress it into tagsBuffer
286           current.ensureSpaceForTags();
287           try {
288             current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
289                 current.tagsBuffer, 0, current.tagsLength);
290           } catch (IOException e) {
291             throw new RuntimeException("Exception while uncompressing tags", e);
292           }
293         } else {
294           ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
295           current.uncompressTags = true;// Reset this.
296         }
297         current.tagsOffset = -1;
298       } else {
299         // When tag compress is not used, let us not do temp copying of tags bytes into tagsBuffer.
300         // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
301         current.tagsOffset = currentBuffer.position();
302         ByteBufferUtils.skip(currentBuffer, current.tagsLength);
303       }
304     }
305 
306     @Override
307     public int seekToKeyInBlock(byte[] key, int offset, int length,
308         boolean seekBefore) {
309       int commonPrefix = 0;
310       previous.invalidate();
311       do {
312         int comp;
313         if (samePrefixComparator != null) {
314           commonPrefix = Math.min(commonPrefix, current.lastCommonPrefix);
315 
316           // extend commonPrefix
317           commonPrefix += ByteBufferUtils.findCommonPrefix(
318               key, offset + commonPrefix, length - commonPrefix,
319               current.keyBuffer, commonPrefix,
320               current.keyLength - commonPrefix);
321 
322           comp = samePrefixComparator.compareIgnoringPrefix(commonPrefix, key,
323               offset, length, current.keyBuffer, 0, current.keyLength);
324         } else {
325           comp = comparator.compareFlatKey(key, offset, length,
326               current.keyBuffer, 0, current.keyLength);
327         }
328 
329         if (comp == 0) { // exact match
330           if (seekBefore) {
331             if (!previous.isValid()) {
332               // The caller (seekBefore) has to ensure that we are not at the
333               // first key in the block.
334               throw new IllegalStateException("Cannot seekBefore if " +
335                   "positioned at the first key in the block: key=" +
336                   Bytes.toStringBinary(key, offset, length));
337             }
338             moveToPrevious();
339             return 1;
340           }
341           return 0;
342         }
343 
344         if (comp < 0) { // already too large, check previous
345           if (previous.isValid()) {
346             moveToPrevious();
347           } else {
348             return HConstants.INDEX_KEY_MAGIC; // using optimized index key
349           }
350           return 1;
351         }
352 
353         // move to next, if more data is available
354         if (currentBuffer.hasRemaining()) {
355           previous.copyFromNext(current);
356           decodeNext();
357         } else {
358           break;
359         }
360       } while (true);
361 
362       // we hit the end of the block, not an exact match
363       return 1;
364     }
365 
366     private void moveToPrevious() {
367       if (!previous.isValid()) {
368         throw new IllegalStateException(
369             "Can move back only once and not in first key in the block.");
370       }
371 
372       STATE tmp = previous;
373       previous = current;
374       current = tmp;
375 
376       // move after last key value
377       currentBuffer.position(current.nextKvOffset);
378       // Already decoded the tag bytes. We cache this tags into current state and also the total
379       // compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
380       // the tags again. This might pollute the Data Dictionary what we use for the compression.
381       // When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
382       // 'tagsCompressedLength' bytes of source stream.
383       // See in decodeTags()
384       current.tagsBuffer = previous.tagsBuffer;
385       current.tagsCompressedLength = previous.tagsCompressedLength;
386       current.uncompressTags = false;
387       previous.invalidate();
388     }
389 
390     @SuppressWarnings("unchecked")
391     protected STATE createSeekerState() {
392       // This will fail for non-default seeker state if the subclass does not
393       // override this method.
394       return (STATE) new SeekerState();
395     }
396 
397     abstract protected void decodeFirst();
398     abstract protected void decodeNext();
399   }
400 
401   protected final void afterEncodingKeyValue(ByteBuffer in,
402       DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
403     if (encodingCtx.getHFileContext().isIncludesTags()) {
404       short tagsLength = in.getShort();
405       ByteBufferUtils.putCompressedInt(out, tagsLength);
406       // There are some tags to be written
407       if (tagsLength > 0) {
408         TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
409         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
410         // the tags using Dictionary compression in such a case
411         if (tagCompressionContext != null) {
412           tagCompressionContext.compressTags(out, in, tagsLength);
413         } else {
414           ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
415         }
416       }
417     }
418     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
419       // Copy memstore timestamp from the byte buffer to the output stream.
420       long memstoreTS = -1;
421       try {
422         memstoreTS = ByteBufferUtils.readVLong(in);
423         WritableUtils.writeVLong(out, memstoreTS);
424       } catch (IOException ex) {
425         throw new RuntimeException("Unable to copy memstore timestamp " +
426             memstoreTS + " after encoding a key/value");
427       }
428     }
429   }
430 
431   protected final void afterDecodingKeyValue(DataInputStream source,
432       ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
433     if (decodingCtx.getHFileContext().isIncludesTags()) {
434       short tagsLength = (short) ByteBufferUtils.readCompressedInt(source);
435       dest.putShort(tagsLength);
436       if (tagsLength > 0) {
437         TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
438         // When tag compression is been used in this file, tagCompressionContext will have a not
439         // null value passed.
440         if (tagCompressionContext != null) {
441           tagCompressionContext.uncompressTags(source, dest, tagsLength);
442         } else {
443           ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
444         }
445       }
446     }
447     if (decodingCtx.getHFileContext().isIncludesMvcc()) {
448       long memstoreTS = -1;
449       try {
450         // Copy memstore timestamp from the data input stream to the byte
451         // buffer.
452         memstoreTS = WritableUtils.readVLong(source);
453         ByteBufferUtils.writeVLong(dest, memstoreTS);
454       } catch (IOException ex) {
455         throw new RuntimeException("Unable to copy memstore timestamp " +
456             memstoreTS + " after decoding a key/value");
457       }
458     }
459   }
460 
461   @Override
462   public HFileBlockEncodingContext newDataBlockEncodingContext(DataBlockEncoding encoding,
463       byte[] header, HFileContext meta) {
464     return new HFileBlockDefaultEncodingContext(encoding, header, meta);
465   }
466 
467   @Override
468   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
469     return new HFileBlockDefaultDecodingContext(meta);
470   }
471 
472   /**
473    * Compress KeyValues and write them to output buffer.
474    * @param out Where to write compressed data.
475    * @param in Source of KeyValue for compression.
476    * @param encodingCtx use the Encoding ctx associated with the current block
477    * @throws IOException If there is an error writing to output stream.
478    */
479   public abstract void internalEncodeKeyValues(DataOutputStream out,
480       ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
481 
482   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
483       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
484       throws IOException;
485 
486   @Override
487   public void encodeKeyValues(ByteBuffer in,
488       HFileBlockEncodingContext blkEncodingCtx) throws IOException {
489     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
490       throw new IOException (this.getClass().getName() + " only accepts "
491           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
492           "encoding context.");
493     }
494 
495     HFileBlockDefaultEncodingContext encodingCtx =
496         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
497     encodingCtx.prepareEncoding();
498     DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
499     if (encodingCtx.getHFileContext().isIncludesTags()
500         && encodingCtx.getHFileContext().isCompressTags()) {
501       if (encodingCtx.getTagCompressionContext() != null) {
502         // It will be overhead to create the TagCompressionContext again and again for every block
503         // encoding.
504         encodingCtx.getTagCompressionContext().clear();
505       } else {
506         try {
507           TagCompressionContext tagCompressionContext = new TagCompressionContext(
508               LRUDictionary.class, Byte.MAX_VALUE);
509           encodingCtx.setTagCompressionContext(tagCompressionContext);
510         } catch (Exception e) {
511           throw new IOException("Failed to initialize TagCompressionContext", e);
512         }
513       }
514     }
515     internalEncodeKeyValues(dataOut, in, encodingCtx);
516     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
517       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
518     } else {
519       encodingCtx.postEncoding(BlockType.DATA);
520     }
521   }
522 
523   /**
524    * Asserts that there is at least the given amount of unfilled space
525    * remaining in the given buffer.
526    * @param out typically, the buffer we are writing to
527    * @param length the required space in the buffer
528    * @throws EncoderBufferTooSmallException If there are no enough bytes.
529    */
530   protected static void ensureSpace(ByteBuffer out, int length)
531       throws EncoderBufferTooSmallException {
532     if (out.position() + length > out.limit()) {
533       throw new EncoderBufferTooSmallException(
534           "Buffer position=" + out.position() +
535           ", buffer limit=" + out.limit() +
536           ", length to be written=" + length);
537     }
538   }
539 
540 }