001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.nio.ByteBuffer;
020
021import org.apache.hadoop.hbase.ByteBufferExtendedCell;
022import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
023import org.apache.hadoop.hbase.ByteBufferKeyValue;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellComparator;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.SizeCachedKeyValue;
031import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
032import org.apache.hadoop.hbase.io.encoding.AbstractDataBlockEncoder.AbstractEncodedSeeker;
033import org.apache.hadoop.hbase.nio.ByteBuff;
034import org.apache.hadoop.hbase.util.ByteBufferUtils;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ObjectIntPair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039@InterfaceAudience.Private
040public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
041
042  // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
043  // many object creations.
044  protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
045
046  private ByteBuff currentBuffer;
047  private SeekerState current = new SeekerState(); // always valid
048  private SeekerState previous = new SeekerState(); // may not be valid
049
050  private int rowNumber;
051  private ByteBuff rowOffsets = null;
052
053  public RowIndexSeekerV1(CellComparator comparator,
054      HFileBlockDecodingContext decodingCtx) {
055    super(comparator, decodingCtx);
056  }
057
058  @Override
059  public void setCurrentBuffer(ByteBuff buffer) {
060    int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
061
062    // Data part
063    ByteBuff dup = buffer.duplicate();
064    dup.position(buffer.position());
065    dup.limit(buffer.position() + onDiskSize);
066    currentBuffer = dup.slice();
067    current.currentBuffer = currentBuffer;
068    buffer.skip(onDiskSize);
069
070    // Row offset
071    rowNumber = buffer.getInt();
072    int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
073    ByteBuff rowDup = buffer.duplicate();
074    rowDup.position(buffer.position());
075    rowDup.limit(buffer.position() + totalRowOffsetsLength);
076    rowOffsets = rowDup.slice();
077
078    decodeFirst();
079  }
080
081  @Override
082  public Cell getKey() {
083    if (current.keyBuffer.hasArray()) {
084      return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
085          current.keyBuffer.arrayOffset() + current.keyBuffer.position(),
086          current.keyLength);
087    } else {
088      byte[] key = new byte[current.keyLength];
089      ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer,
090          current.keyBuffer.position(), 0, current.keyLength);
091      return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
092    }
093  }
094
095  @Override
096  public ByteBuffer getValueShallowCopy() {
097    currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength,
098        tmpPair);
099    ByteBuffer dup = tmpPair.getFirst().duplicate();
100    dup.position(tmpPair.getSecond());
101    dup.limit(tmpPair.getSecond() + current.valueLength);
102    return dup.slice();
103  }
104
105  @Override
106  public Cell getCell() {
107    return current.toCell();
108  }
109
110  @Override
111  public void rewind() {
112    currentBuffer.rewind();
113    decodeFirst();
114  }
115
116  @Override
117  public boolean next() {
118    if (!currentBuffer.hasRemaining()) {
119      return false;
120    }
121    decodeNext();
122    previous.invalidate();
123    return true;
124  }
125
126  private int binarySearch(Cell seekCell, boolean seekBefore) {
127    int low = 0;
128    int high = rowNumber - 1;
129    int mid = (low + high) >>> 1;
130    int comp = 0;
131    while (low <= high) {
132      mid = (low + high) >>> 1;
133      ByteBuffer row = getRow(mid);
134      comp = compareRows(row, seekCell);
135      if (comp < 0) {
136        low = mid + 1;
137      } else if (comp > 0) {
138        high = mid - 1;
139      } else {
140        // key found
141        if (seekBefore) {
142          return mid - 1;
143        } else {
144          return mid;
145        }
146      }
147    }
148    // key not found.
149    if (comp > 0) {
150      return mid - 1;
151    } else {
152      return mid;
153    }
154  }
155
156  private int compareRows(ByteBuffer row, Cell seekCell) {
157    if (seekCell instanceof ByteBufferExtendedCell) {
158      return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
159          ((ByteBufferExtendedCell) seekCell).getRowByteBuffer(),
160          ((ByteBufferExtendedCell) seekCell).getRowPosition(),
161          seekCell.getRowLength());
162    } else {
163      return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
164          seekCell.getRowArray(), seekCell.getRowOffset(),
165          seekCell.getRowLength());
166    }
167  }
168
169  private ByteBuffer getRow(int index) {
170    int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
171    ByteBuff block = currentBuffer.duplicate();
172    block.position(offset + Bytes.SIZEOF_LONG);
173    short rowLen = block.getShort();
174    block.asSubByteBuffer(block.position(), rowLen, tmpPair);
175    ByteBuffer row = tmpPair.getFirst();
176    row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
177    return row;
178  }
179
180  @Override
181  public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
182    previous.invalidate();
183    int index = binarySearch(seekCell, seekBefore);
184    if (index < 0) {
185      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
186    } else {
187      int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
188      if (offset != 0) {
189        decodeAtPosition(offset);
190      }
191    }
192    do {
193      int comp;
194      comp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, seekCell, current.currentKey);
195      if (comp == 0) { // exact match
196        if (seekBefore) {
197          if (!previous.isValid()) {
198            // The caller (seekBefore) has to ensure that we are not at the
199            // first key in the block.
200            throw new IllegalStateException("Cannot seekBefore if "
201                + "positioned at the first key in the block: key="
202                + Bytes.toStringBinary(seekCell.getRowArray()));
203          }
204          moveToPrevious();
205          return 1;
206        }
207        return 0;
208      }
209
210      if (comp < 0) { // already too large, check previous
211        if (previous.isValid()) {
212          moveToPrevious();
213        } else {
214          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
215        }
216        return 1;
217      }
218
219      // move to next, if more data is available
220      if (currentBuffer.hasRemaining()) {
221        previous.copyFromNext(current);
222        decodeNext();
223      } else {
224        break;
225      }
226    } while (true);
227
228    // we hit the end of the block, not an exact match
229    return 1;
230  }
231
232  private void moveToPrevious() {
233    if (!previous.isValid()) {
234      throw new IllegalStateException("Can move back only once and not in first key in the block.");
235    }
236
237    SeekerState tmp = previous;
238    previous = current;
239    current = tmp;
240
241    // move after last key value
242    currentBuffer.position(current.nextKvOffset);
243    previous.invalidate();
244  }
245
246  @Override
247  public int compareKey(CellComparator comparator, Cell key) {
248    return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, current.currentKey);
249  }
250
251  protected void decodeFirst() {
252    decodeNext();
253    previous.invalidate();
254  }
255
256  protected void decodeAtPosition(int position) {
257    currentBuffer.position(position);
258    decodeNext();
259    previous.invalidate();
260  }
261
262  protected void decodeNext() {
263    current.startOffset = currentBuffer.position();
264    long ll = currentBuffer.getLongAfterPosition(0);
265    // Read top half as an int of key length and bottom int as value length
266    current.keyLength = (int) (ll >> Integer.SIZE);
267    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
268    currentBuffer.skip(Bytes.SIZEOF_LONG);
269    // key part
270    currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength,
271        tmpPair);
272    ByteBuffer key = tmpPair.getFirst().duplicate();
273    key.position(tmpPair.getSecond()).limit(
274        tmpPair.getSecond() + current.keyLength);
275    current.keyBuffer = key;
276    currentBuffer.skip(current.keyLength);
277    // value part
278    current.valueOffset = currentBuffer.position();
279    currentBuffer.skip(current.valueLength);
280    if (includesTags()) {
281      decodeTags();
282    }
283    if (includesMvcc()) {
284      current.memstoreTS = ByteBuff.readVLong(currentBuffer);
285    } else {
286      current.memstoreTS = 0;
287    }
288    current.nextKvOffset = currentBuffer.position();
289    current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(),
290        current.keyLength);
291  }
292
293  protected void decodeTags() {
294    current.tagsLength = currentBuffer.getShortAfterPosition(0);
295    currentBuffer.skip(Bytes.SIZEOF_SHORT);
296    current.tagsOffset = currentBuffer.position();
297    currentBuffer.skip(current.tagsLength);
298  }
299
300  private class SeekerState {
301    /**
302     * The size of a (key length, value length) tuple that prefixes each entry
303     * in a data block.
304     */
305    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
306
307    protected ByteBuff currentBuffer;
308    protected int startOffset = -1;
309    protected int valueOffset = -1;
310    protected int keyLength;
311    protected int valueLength;
312    protected int tagsLength = 0;
313    protected int tagsOffset = -1;
314
315    protected ByteBuffer keyBuffer = null;
316    protected long memstoreTS;
317    protected int nextKvOffset;
318    // buffer backed keyonlyKV
319    private ByteBufferKeyOnlyKeyValue currentKey = new ByteBufferKeyOnlyKeyValue();
320
321    protected boolean isValid() {
322      return valueOffset != -1;
323    }
324
325    protected void invalidate() {
326      valueOffset = -1;
327      currentKey = new ByteBufferKeyOnlyKeyValue();
328      currentBuffer = null;
329    }
330
331    /**
332     * Copy the state from the next one into this instance (the previous state placeholder). Used to
333     * save the previous state when we are advancing the seeker to the next key/value.
334     */
335    protected void copyFromNext(SeekerState nextState) {
336      keyBuffer = nextState.keyBuffer;
337      currentKey.setKey(nextState.keyBuffer,
338          nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT,
339          nextState.keyLength);
340
341      startOffset = nextState.startOffset;
342      valueOffset = nextState.valueOffset;
343      keyLength = nextState.keyLength;
344      valueLength = nextState.valueLength;
345      nextKvOffset = nextState.nextKvOffset;
346      memstoreTS = nextState.memstoreTS;
347      currentBuffer = nextState.currentBuffer;
348      tagsOffset = nextState.tagsOffset;
349      tagsLength = nextState.tagsLength;
350    }
351
352    @Override
353    public String toString() {
354      return CellUtil.getCellKeyAsString(toCell());
355    }
356
357    protected int getCellBufSize() {
358      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
359      if (includesTags() && tagsLength > 0) {
360        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
361      }
362      return kvBufSize;
363    }
364
365    public Cell toCell() {
366      Cell ret;
367      int cellBufSize = getCellBufSize();
368      long seqId = 0L;
369      if (includesMvcc()) {
370        seqId = memstoreTS;
371      }
372      if (currentBuffer.hasArray()) {
373        // TODO : reduce the varieties of KV here. Check if based on a boolean
374        // we can handle the 'no tags' case.
375        if (tagsLength > 0) {
376          ret = new SizeCachedKeyValue(currentBuffer.array(),
377              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
378        } else {
379          ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
380              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
381        }
382      } else {
383        currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
384        ByteBuffer buf = tmpPair.getFirst();
385        if (buf.isDirect()) {
386          ret = new ByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId);
387        } else {
388          if (tagsLength > 0) {
389            ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset()
390                + tmpPair.getSecond(), cellBufSize, seqId);
391          } else {
392            ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset()
393                + tmpPair.getSecond(), cellBufSize, seqId);
394          }
395        }
396      }
397      return ret;
398    }
399  }
400
401}