001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.nio.ByteBuffer;
020
021import org.apache.hadoop.hbase.ByteBufferExtendedCell;
022import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
023import org.apache.hadoop.hbase.ByteBufferKeyValue;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellComparator;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.SizeCachedKeyValue;
032import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
033import org.apache.hadoop.hbase.io.encoding.AbstractDataBlockEncoder.AbstractEncodedSeeker;
034import org.apache.hadoop.hbase.nio.ByteBuff;
035import org.apache.hadoop.hbase.util.ByteBufferUtils;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ObjectIntPair;
038import org.apache.yetus.audience.InterfaceAudience;
039
040@InterfaceAudience.Private
041public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
042
043  // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
044  // many object creations.
045  protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
046
047  private ByteBuff currentBuffer;
048  private SeekerState current = new SeekerState(); // always valid
049  private SeekerState previous = new SeekerState(); // may not be valid
050
051  private int rowNumber;
052  private ByteBuff rowOffsets = null;
053
054  public RowIndexSeekerV1(CellComparator comparator,
055      HFileBlockDecodingContext decodingCtx) {
056    super(comparator, decodingCtx);
057  }
058
059  @Override
060  public void setCurrentBuffer(ByteBuff buffer) {
061    int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
062
063    // Data part
064    ByteBuff dup = buffer.duplicate();
065    dup.position(buffer.position());
066    dup.limit(buffer.position() + onDiskSize);
067    currentBuffer = dup.slice();
068    current.currentBuffer = currentBuffer;
069    buffer.skip(onDiskSize);
070
071    // Row offset
072    rowNumber = buffer.getInt();
073    int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
074    ByteBuff rowDup = buffer.duplicate();
075    rowDup.position(buffer.position());
076    rowDup.limit(buffer.position() + totalRowOffsetsLength);
077    rowOffsets = rowDup.slice();
078
079    decodeFirst();
080  }
081
082  @Override
083  public Cell getKey() {
084    if (current.keyBuffer.hasArray()) {
085      return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
086          current.keyBuffer.arrayOffset() + current.keyBuffer.position(),
087          current.keyLength);
088    } else {
089      byte[] key = new byte[current.keyLength];
090      ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer,
091          current.keyBuffer.position(), 0, current.keyLength);
092      return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
093    }
094  }
095
096  @Override
097  public ByteBuffer getValueShallowCopy() {
098    currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength,
099        tmpPair);
100    ByteBuffer dup = tmpPair.getFirst().duplicate();
101    dup.position(tmpPair.getSecond());
102    dup.limit(tmpPair.getSecond() + current.valueLength);
103    return dup.slice();
104  }
105
106  @Override
107  public Cell getCell() {
108    return current.toCell();
109  }
110
111  @Override
112  public void rewind() {
113    currentBuffer.rewind();
114    decodeFirst();
115  }
116
117  @Override
118  public boolean next() {
119    if (!currentBuffer.hasRemaining()) {
120      return false;
121    }
122    decodeNext();
123    previous.invalidate();
124    return true;
125  }
126
127  private int binarySearch(Cell seekCell, boolean seekBefore) {
128    int low = 0;
129    int high = rowNumber - 1;
130    int mid = low + ((high - low) >> 1);
131    int comp = 0;
132    while (low <= high) {
133      mid = low + ((high - low) >> 1);
134      ByteBuffer row = getRow(mid);
135      comp = compareRows(row, seekCell);
136      if (comp < 0) {
137        low = mid + 1;
138      } else if (comp > 0) {
139        high = mid - 1;
140      } else {
141        // key found
142        if (seekBefore) {
143          return mid - 1;
144        } else {
145          return mid;
146        }
147      }
148    }
149    // key not found.
150    if (comp > 0) {
151      return mid - 1;
152    } else {
153      return mid;
154    }
155  }
156
157  private int compareRows(ByteBuffer row, Cell seekCell) {
158    if (seekCell instanceof ByteBufferExtendedCell) {
159      return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
160          ((ByteBufferExtendedCell) seekCell).getRowByteBuffer(),
161          ((ByteBufferExtendedCell) seekCell).getRowPosition(),
162          seekCell.getRowLength());
163    } else {
164      return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
165          seekCell.getRowArray(), seekCell.getRowOffset(),
166          seekCell.getRowLength());
167    }
168  }
169
170  private ByteBuffer getRow(int index) {
171    int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
172    ByteBuff block = currentBuffer.duplicate();
173    block.position(offset + Bytes.SIZEOF_LONG);
174    short rowLen = block.getShort();
175    block.asSubByteBuffer(block.position(), rowLen, tmpPair);
176    ByteBuffer row = tmpPair.getFirst();
177    row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
178    return row;
179  }
180
181  @Override
182  public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
183    previous.invalidate();
184    int index = binarySearch(seekCell, seekBefore);
185    if (index < 0) {
186      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
187    } else {
188      int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
189      if (offset != 0) {
190        decodeAtPosition(offset);
191      }
192    }
193    do {
194      int comp;
195      comp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, seekCell, current.currentKey);
196      if (comp == 0) { // exact match
197        if (seekBefore) {
198          if (!previous.isValid()) {
199            // The caller (seekBefore) has to ensure that we are not at the
200            // first key in the block.
201            throw new IllegalStateException("Cannot seekBefore if "
202                + "positioned at the first key in the block: key="
203                + Bytes.toStringBinary(seekCell.getRowArray()));
204          }
205          moveToPrevious();
206          return 1;
207        }
208        return 0;
209      }
210
211      if (comp < 0) { // already too large, check previous
212        if (previous.isValid()) {
213          moveToPrevious();
214        } else {
215          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
216        }
217        return 1;
218      }
219
220      // move to next, if more data is available
221      if (currentBuffer.hasRemaining()) {
222        previous.copyFromNext(current);
223        decodeNext();
224      } else {
225        break;
226      }
227    } while (true);
228
229    // we hit the end of the block, not an exact match
230    return 1;
231  }
232
233  private void moveToPrevious() {
234    if (!previous.isValid()) {
235      throw new IllegalStateException("Can move back only once and not in first key in the block.");
236    }
237
238    SeekerState tmp = previous;
239    previous = current;
240    current = tmp;
241
242    // move after last key value
243    currentBuffer.position(current.nextKvOffset);
244    previous.invalidate();
245  }
246
247  @Override
248  public int compareKey(CellComparator comparator, Cell key) {
249    return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, current.currentKey);
250  }
251
252  protected void decodeFirst() {
253    decodeNext();
254    previous.invalidate();
255  }
256
257  protected void decodeAtPosition(int position) {
258    currentBuffer.position(position);
259    decodeNext();
260    previous.invalidate();
261  }
262
263  protected void decodeNext() {
264    current.startOffset = currentBuffer.position();
265    long ll = currentBuffer.getLongAfterPosition(0);
266    // Read top half as an int of key length and bottom int as value length
267    current.keyLength = (int) (ll >> Integer.SIZE);
268    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
269    currentBuffer.skip(Bytes.SIZEOF_LONG);
270    // key part
271    currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength,
272        tmpPair);
273    ByteBuffer key = tmpPair.getFirst().duplicate();
274    key.position(tmpPair.getSecond()).limit(
275        tmpPair.getSecond() + current.keyLength);
276    current.keyBuffer = key;
277    currentBuffer.skip(current.keyLength);
278    // value part
279    current.valueOffset = currentBuffer.position();
280    currentBuffer.skip(current.valueLength);
281    if (includesTags()) {
282      decodeTags();
283    }
284    if (includesMvcc()) {
285      current.memstoreTS = ByteBuff.readVLong(currentBuffer);
286    } else {
287      current.memstoreTS = 0;
288    }
289    current.nextKvOffset = currentBuffer.position();
290    current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(),
291        current.keyLength);
292  }
293
294  protected void decodeTags() {
295    current.tagsLength = currentBuffer.getShortAfterPosition(0);
296    currentBuffer.skip(Bytes.SIZEOF_SHORT);
297    current.tagsOffset = currentBuffer.position();
298    currentBuffer.skip(current.tagsLength);
299  }
300
301  private class SeekerState {
302    /**
303     * The size of a (key length, value length) tuple that prefixes each entry
304     * in a data block.
305     */
306    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
307
308    protected ByteBuff currentBuffer;
309    protected int startOffset = -1;
310    protected int valueOffset = -1;
311    protected int keyLength;
312    protected int valueLength;
313    protected int tagsLength = 0;
314    protected int tagsOffset = -1;
315
316    protected ByteBuffer keyBuffer = null;
317    protected long memstoreTS;
318    protected int nextKvOffset;
319    // buffer backed keyonlyKV
320    private ByteBufferKeyOnlyKeyValue currentKey = new ByteBufferKeyOnlyKeyValue();
321
322    protected boolean isValid() {
323      return valueOffset != -1;
324    }
325
326    protected void invalidate() {
327      valueOffset = -1;
328      currentKey = new ByteBufferKeyOnlyKeyValue();
329      currentBuffer = null;
330    }
331
332    /**
333     * Copy the state from the next one into this instance (the previous state placeholder). Used to
334     * save the previous state when we are advancing the seeker to the next key/value.
335     */
336    protected void copyFromNext(SeekerState nextState) {
337      keyBuffer = nextState.keyBuffer;
338      currentKey.setKey(nextState.keyBuffer,
339          nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT,
340          nextState.keyLength);
341
342      startOffset = nextState.startOffset;
343      valueOffset = nextState.valueOffset;
344      keyLength = nextState.keyLength;
345      valueLength = nextState.valueLength;
346      nextKvOffset = nextState.nextKvOffset;
347      memstoreTS = nextState.memstoreTS;
348      currentBuffer = nextState.currentBuffer;
349      tagsOffset = nextState.tagsOffset;
350      tagsLength = nextState.tagsLength;
351    }
352
353    @Override
354    public String toString() {
355      return CellUtil.getCellKeyAsString(toCell());
356    }
357
358    protected int getCellBufSize() {
359      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
360      if (includesTags() && tagsLength > 0) {
361        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
362      }
363      return kvBufSize;
364    }
365
366    public Cell toCell() {
367      Cell ret;
368      int cellBufSize = getCellBufSize();
369      long seqId = 0L;
370      if (includesMvcc()) {
371        seqId = memstoreTS;
372      }
373      if (currentBuffer.hasArray()) {
374        // TODO : reduce the varieties of KV here. Check if based on a boolean
375        // we can handle the 'no tags' case.
376        if (tagsLength > 0) {
377          ret = new SizeCachedKeyValue(currentBuffer.array(),
378              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
379        } else {
380          ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
381              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
382        }
383      } else {
384        currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
385        ByteBuffer buf = tmpPair.getFirst();
386        if (buf.isDirect()) {
387          ret =
388              tagsLength > 0 ? new ByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId)
389                  : new NoTagsByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId);
390        } else {
391          if (tagsLength > 0) {
392            ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset()
393                + tmpPair.getSecond(), cellBufSize, seqId);
394          } else {
395            ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset()
396                + tmpPair.getSecond(), cellBufSize, seqId);
397          }
398        }
399      }
400      return ret;
401    }
402  }
403
404}