View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import java.io.DataInputStream;
20  import java.io.DataOutputStream;
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  
24  import org.apache.hadoop.hbase.KeyValue;
25  import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
26  import org.apache.hadoop.hbase.util.ByteBufferUtils;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.io.RawComparator;
29  import org.apache.hadoop.io.WritableUtils;
30  
31  /**
32   * Base class for all data block encoders that use a buffer.
33   */
34  abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
35  
36    private static int INITIAL_KEY_BUFFER_SIZE = 512;
37  
38    @Override
39    public ByteBuffer uncompressKeyValues(DataInputStream source,
40        boolean includesMemstoreTS) throws IOException {
41      return uncompressKeyValues(source, 0, 0, includesMemstoreTS);
42    }
43  
44    protected static class SeekerState {
45      protected int valueOffset = -1;
46      protected int keyLength;
47      protected int valueLength;
48      protected int lastCommonPrefix;
49  
50      /** We need to store a copy of the key. */
51      protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
52  
53      protected long memstoreTS;
54      protected int nextKvOffset;
55  
56      protected boolean isValid() {
57        return valueOffset != -1;
58      }
59  
60      protected void invalidate() {
61        valueOffset = -1;
62      }
63  
64      protected void ensureSpaceForKey() {
65        if (keyLength > keyBuffer.length) {
66          // rare case, but we need to handle arbitrary length of key
67          int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
68          while (keyLength > newKeyBufferLength) {
69            newKeyBufferLength *= 2;
70          }
71          byte[] newKeyBuffer = new byte[newKeyBufferLength];
72          System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
73          keyBuffer = newKeyBuffer;
74        }
75      }
76  
77      /**
78       * Copy the state from the next one into this instance (the previous state
79       * placeholder). Used to save the previous state when we are advancing the
80       * seeker to the next key/value.
81       */
82      protected void copyFromNext(SeekerState nextState) {
83        if (keyBuffer.length != nextState.keyBuffer.length) {
84          keyBuffer = nextState.keyBuffer.clone();
85        } else if (!isValid()) {
86          // Note: we can only call isValid before we override our state, so this
87          // comes before all the assignments at the end of this method.
88          System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
89               nextState.keyLength);
90        } else {
91          // don't copy the common prefix between this key and the previous one
92          System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
93              keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
94                  - nextState.lastCommonPrefix);
95        }
96  
97        valueOffset = nextState.valueOffset;
98        keyLength = nextState.keyLength;
99        valueLength = nextState.valueLength;
100       lastCommonPrefix = nextState.lastCommonPrefix;
101       nextKvOffset = nextState.nextKvOffset;
102       memstoreTS = nextState.memstoreTS;
103     }
104 
105   }
106 
107   protected abstract static class
108       BufferedEncodedSeeker<STATE extends SeekerState>
109       implements EncodedSeeker {
110 
111     protected final RawComparator<byte[]> comparator;
112     protected final SamePrefixComparator<byte[]> samePrefixComparator;
113     protected ByteBuffer currentBuffer;
114     protected STATE current = createSeekerState(); // always valid
115     protected STATE previous = createSeekerState(); // may not be valid
116 
117     @SuppressWarnings("unchecked")
118     public BufferedEncodedSeeker(RawComparator<byte[]> comparator) {
119       this.comparator = comparator;
120       if (comparator instanceof SamePrefixComparator) {
121         this.samePrefixComparator = (SamePrefixComparator<byte[]>) comparator;
122       } else {
123         this.samePrefixComparator = null;
124       }
125     }
126 
127     @Override
128     public int compareKey(RawComparator<byte[]> comparator, byte[] key, int offset, int length) {
129       return comparator.compare(key,  offset, length, current.keyBuffer, 0, current.keyLength);
130     }
131 
132     @Override
133     public void setCurrentBuffer(ByteBuffer buffer) {
134       currentBuffer = buffer;
135       decodeFirst();
136       previous.invalidate();
137     }
138 
139     @Override
140     public ByteBuffer getKeyDeepCopy() {
141       ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
142       keyBuffer.put(current.keyBuffer, 0, current.keyLength);
143       return keyBuffer;
144     }
145 
146     @Override
147     public ByteBuffer getValueShallowCopy() {
148       return ByteBuffer.wrap(currentBuffer.array(),
149           currentBuffer.arrayOffset() + current.valueOffset,
150           current.valueLength);
151     }
152 
153     @Override
154     public ByteBuffer getKeyValueBuffer() {
155       ByteBuffer kvBuffer = ByteBuffer.allocate(
156           2 * Bytes.SIZEOF_INT + current.keyLength + current.valueLength);
157       kvBuffer.putInt(current.keyLength);
158       kvBuffer.putInt(current.valueLength);
159       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
160       kvBuffer.put(currentBuffer.array(),
161           currentBuffer.arrayOffset() + current.valueOffset,
162           current.valueLength);
163       return kvBuffer;
164     }
165 
166     @Override
167     public KeyValue getKeyValue() {
168       ByteBuffer kvBuf = getKeyValueBuffer();
169       KeyValue kv = new KeyValue(kvBuf.array(), kvBuf.arrayOffset());
170       kv.setMemstoreTS(current.memstoreTS);
171       return kv;
172     }
173 
174     @Override
175     public void rewind() {
176       currentBuffer.rewind();
177       decodeFirst();
178       previous.invalidate();
179     }
180 
181     @Override
182     public boolean next() {
183       if (!currentBuffer.hasRemaining()) {
184         return false;
185       }
186       decodeNext();
187       previous.invalidate();
188       return true;
189     }
190 
191     @Override
192     public int seekToKeyInBlock(byte[] key, int offset, int length,
193         boolean seekBefore) {
194       int commonPrefix = 0;
195       previous.invalidate();
196       do {
197         int comp;
198         if (samePrefixComparator != null) {
199           commonPrefix = Math.min(commonPrefix, current.lastCommonPrefix);
200 
201           // extend commonPrefix
202           commonPrefix += ByteBufferUtils.findCommonPrefix(
203               key, offset + commonPrefix, length - commonPrefix,
204               current.keyBuffer, commonPrefix,
205               current.keyLength - commonPrefix);
206 
207           comp = samePrefixComparator.compareIgnoringPrefix(commonPrefix, key,
208               offset, length, current.keyBuffer, 0, current.keyLength);
209         } else {
210           comp = comparator.compare(key, offset, length,
211               current.keyBuffer, 0, current.keyLength);
212         }
213 
214         if (comp == 0) { // exact match
215           if (seekBefore) {
216             if (!previous.isValid()) {
217               // The caller (seekBefore) has to ensure that we are not at the
218               // first key in the block.
219               throw new IllegalStateException("Cannot seekBefore if " +
220                   "positioned at the first key in the block: key=" +
221                   Bytes.toStringBinary(key, offset, length));
222             }
223             moveToPrevious();
224             return 1;
225           }
226           return 0;
227         }
228 
229         if (comp < 0) { // already too large, check previous
230           if (previous.isValid()) {
231             moveToPrevious();
232           }
233           return 1;
234         }
235 
236         // move to next, if more data is available
237         if (currentBuffer.hasRemaining()) {
238           previous.copyFromNext(current);
239           decodeNext();
240         } else {
241           break;
242         }
243       } while (true);
244 
245       // we hit the end of the block, not an exact match
246       return 1;
247     }
248 
249     private void moveToPrevious() {
250       if (!previous.isValid()) {
251         throw new IllegalStateException(
252             "Can move back only once and not in first key in the block.");
253       }
254 
255       STATE tmp = previous;
256       previous = current;
257       current = tmp;
258 
259       // move after last key value
260       currentBuffer.position(current.nextKvOffset);
261 
262       previous.invalidate();
263     }
264 
265     @SuppressWarnings("unchecked")
266     protected STATE createSeekerState() {
267       // This will fail for non-default seeker state if the subclass does not
268       // override this method.
269       return (STATE) new SeekerState();
270     }
271 
272     abstract protected void decodeFirst();
273     abstract protected void decodeNext();
274   }
275 
276   protected final void afterEncodingKeyValue(ByteBuffer in,
277       DataOutputStream out, boolean includesMemstoreTS) {
278     if (includesMemstoreTS) {
279       // Copy memstore timestamp from the byte buffer to the output stream.
280       long memstoreTS = -1;
281       try {
282         memstoreTS = ByteBufferUtils.readVLong(in);
283         WritableUtils.writeVLong(out, memstoreTS);
284       } catch (IOException ex) {
285         throw new RuntimeException("Unable to copy memstore timestamp " +
286             memstoreTS + " after encoding a key/value");
287       }
288     }
289   }
290 
291   protected final void afterDecodingKeyValue(DataInputStream source,
292       ByteBuffer dest, boolean includesMemstoreTS) {
293     if (includesMemstoreTS) {
294       long memstoreTS = -1;
295       try {
296         // Copy memstore timestamp from the data input stream to the byte
297         // buffer.
298         memstoreTS = WritableUtils.readVLong(source);
299         ByteBufferUtils.writeVLong(dest, memstoreTS);
300       } catch (IOException ex) {
301         throw new RuntimeException("Unable to copy memstore timestamp " +
302             memstoreTS + " after decoding a key/value");
303       }
304     }
305   }
306 
307 }