001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.nio.ByteBuffer;
021import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellComparator;
024import org.apache.hadoop.hbase.CellUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.SizeCachedByteBufferKeyValue;
029import org.apache.hadoop.hbase.SizeCachedKeyValue;
030import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
031import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
032import org.apache.hadoop.hbase.io.encoding.AbstractDataBlockEncoder.AbstractEncodedSeeker;
033import org.apache.hadoop.hbase.nio.ByteBuff;
034import org.apache.hadoop.hbase.util.ByteBufferUtils;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ObjectIntPair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039@InterfaceAudience.Private
040public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
041
042  // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
043  // many object creations.
044  protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
045
046  private ByteBuff currentBuffer;
047  private SeekerState current = new SeekerState(); // always valid
048  private SeekerState previous = new SeekerState(); // may not be valid
049
050  private int rowNumber;
051  private ByteBuff rowOffsets = null;
052  private final CellComparator cellComparator;
053
054  public RowIndexSeekerV1(HFileBlockDecodingContext decodingCtx) {
055    super(decodingCtx);
056    this.cellComparator = decodingCtx.getHFileContext().getCellComparator();
057  }
058
059  @Override
060  public void setCurrentBuffer(ByteBuff buffer) {
061    int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
062
063    // Data part
064    ByteBuff dup = buffer.duplicate();
065    dup.position(buffer.position());
066    dup.limit(buffer.position() + onDiskSize);
067    currentBuffer = dup.slice();
068    current.currentBuffer = currentBuffer;
069    buffer.skip(onDiskSize);
070
071    // Row offset
072    rowNumber = buffer.getInt();
073    int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
074    ByteBuff rowDup = buffer.duplicate();
075    rowDup.position(buffer.position());
076    rowDup.limit(buffer.position() + totalRowOffsetsLength);
077    rowOffsets = rowDup.slice();
078
079    decodeFirst();
080  }
081
082  @Override
083  @SuppressWarnings("ByteBufferBackingArray")
084  public Cell getKey() {
085    if (current.keyBuffer.hasArray()) {
086      return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
087        current.keyBuffer.arrayOffset() + current.keyBuffer.position(), current.keyLength);
088    } else {
089      final byte[] key = new byte[current.keyLength];
090      ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer, current.keyBuffer.position(), 0,
091        current.keyLength);
092      return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
093    }
094  }
095
096  @Override
097  public ByteBuffer getValueShallowCopy() {
098    currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
099    ByteBuffer dup = tmpPair.getFirst().duplicate();
100    dup.position(tmpPair.getSecond());
101    dup.limit(tmpPair.getSecond() + current.valueLength);
102    return dup.slice();
103  }
104
105  @Override
106  public Cell getCell() {
107    return current.toCell();
108  }
109
110  @Override
111  public void rewind() {
112    currentBuffer.rewind();
113    decodeFirst();
114  }
115
116  @Override
117  public boolean next() {
118    if (!currentBuffer.hasRemaining()) {
119      return false;
120    }
121    decodeNext();
122    previous.invalidate();
123    return true;
124  }
125
126  private int binarySearch(Cell seekCell, boolean seekBefore) {
127    int low = 0;
128    int high = rowNumber - 1;
129    int mid = low + ((high - low) >> 1);
130    int comp = 0;
131    while (low <= high) {
132      mid = low + ((high - low) >> 1);
133      comp = this.cellComparator.compareRows(getRow(mid), seekCell);
134      if (comp < 0) {
135        low = mid + 1;
136      } else if (comp > 0) {
137        high = mid - 1;
138      } else {
139        // key found
140        if (seekBefore) {
141          return mid - 1;
142        } else {
143          return mid;
144        }
145      }
146    }
147    // key not found.
148    if (comp > 0) {
149      return mid - 1;
150    } else {
151      return mid;
152    }
153  }
154
155  private ByteBuffer getRow(int index) {
156    int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
157    ByteBuff block = currentBuffer.duplicate();
158    block.position(offset + Bytes.SIZEOF_LONG);
159    short rowLen = block.getShort();
160    block.asSubByteBuffer(block.position(), rowLen, tmpPair);
161    ByteBuffer row = tmpPair.getFirst();
162    row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
163    return row;
164  }
165
166  @Override
167  public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
168    previous.invalidate();
169    int index = binarySearch(seekCell, seekBefore);
170    if (index < 0) {
171      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
172    } else {
173      int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
174      if (offset != 0) {
175        decodeAtPosition(offset);
176      }
177    }
178    do {
179      int comp =
180        PrivateCellUtil.compareKeyIgnoresMvcc(this.cellComparator, seekCell, current.currentKey);
181      if (comp == 0) { // exact match
182        if (seekBefore) {
183          if (!previous.isValid()) {
184            // The caller (seekBefore) has to ensure that we are not at the
185            // first key in the block.
186            throw new IllegalStateException(
187              "Cannot seekBefore if " + "positioned at the first key in the block: key="
188                + Bytes.toStringBinary(seekCell.getRowArray()));
189          }
190          moveToPrevious();
191          return 1;
192        }
193        return 0;
194      }
195
196      if (comp < 0) { // already too large, check previous
197        if (previous.isValid()) {
198          moveToPrevious();
199        } else {
200          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
201        }
202        return 1;
203      }
204
205      // move to next, if more data is available
206      if (currentBuffer.hasRemaining()) {
207        previous.copyFromNext(current);
208        decodeNext();
209      } else {
210        break;
211      }
212    } while (true);
213
214    // we hit the end of the block, not an exact match
215    return 1;
216  }
217
218  private void moveToPrevious() {
219    if (!previous.isValid()) {
220      throw new IllegalStateException("Can move back only once and not in first key in the block.");
221    }
222
223    SeekerState tmp = previous;
224    previous = current;
225    current = tmp;
226
227    // move after last key value
228    currentBuffer.position(current.nextKvOffset);
229    previous.invalidate();
230  }
231
232  @Override
233  public int compareKey(CellComparator comparator, Cell key) {
234    return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, current.currentKey);
235  }
236
237  protected void decodeFirst() {
238    decodeNext();
239    previous.invalidate();
240  }
241
242  protected void decodeAtPosition(int position) {
243    currentBuffer.position(position);
244    decodeNext();
245    previous.invalidate();
246  }
247
248  protected void decodeNext() {
249    current.startOffset = currentBuffer.position();
250    long ll = currentBuffer.getLongAfterPosition(0);
251    // Read top half as an int of key length and bottom int as value length
252    current.keyLength = (int) (ll >> Integer.SIZE);
253    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
254    currentBuffer.skip(Bytes.SIZEOF_LONG);
255    // key part
256    currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength, tmpPair);
257    ByteBuffer key = tmpPair.getFirst().duplicate();
258    key.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + current.keyLength);
259    current.keyBuffer = key;
260    currentBuffer.skip(current.keyLength);
261    // value part
262    current.valueOffset = currentBuffer.position();
263    currentBuffer.skip(current.valueLength);
264    if (includesTags()) {
265      decodeTags();
266    }
267    if (includesMvcc()) {
268      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
269    } else {
270      current.memstoreTS = 0;
271    }
272    current.nextKvOffset = currentBuffer.position();
273    current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(), current.keyLength);
274  }
275
276  protected void decodeTags() {
277    current.tagsLength = currentBuffer.getShortAfterPosition(0);
278    currentBuffer.skip(Bytes.SIZEOF_SHORT);
279    current.tagsOffset = currentBuffer.position();
280    currentBuffer.skip(current.tagsLength);
281  }
282
283  private class SeekerState {
284    /**
285     * The size of a (key length, value length) tuple that prefixes each entry in a data block.
286     */
287    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
288
289    protected ByteBuff currentBuffer;
290    protected int startOffset = -1;
291    protected int valueOffset = -1;
292    protected int keyLength;
293    protected int valueLength;
294    protected int tagsLength = 0;
295    protected int tagsOffset = -1;
296
297    protected ByteBuffer keyBuffer = null;
298    protected long memstoreTS;
299    protected int nextKvOffset;
300    // buffer backed keyonlyKV
301    private ByteBufferKeyOnlyKeyValue currentKey = new ByteBufferKeyOnlyKeyValue();
302
303    protected boolean isValid() {
304      return valueOffset != -1;
305    }
306
307    protected void invalidate() {
308      valueOffset = -1;
309      currentKey = new ByteBufferKeyOnlyKeyValue();
310      currentBuffer = null;
311    }
312
313    /**
314     * Copy the state from the next one into this instance (the previous state placeholder). Used to
315     * save the previous state when we are advancing the seeker to the next key/value.
316     */
317    protected void copyFromNext(SeekerState nextState) {
318      keyBuffer = nextState.keyBuffer;
319      currentKey.setKey(nextState.keyBuffer,
320        nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT, nextState.keyLength);
321
322      startOffset = nextState.startOffset;
323      valueOffset = nextState.valueOffset;
324      keyLength = nextState.keyLength;
325      valueLength = nextState.valueLength;
326      nextKvOffset = nextState.nextKvOffset;
327      memstoreTS = nextState.memstoreTS;
328      currentBuffer = nextState.currentBuffer;
329      tagsOffset = nextState.tagsOffset;
330      tagsLength = nextState.tagsLength;
331    }
332
333    @Override
334    public String toString() {
335      return CellUtil.getCellKeyAsString(toCell());
336    }
337
338    protected int getCellBufSize() {
339      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
340      if (includesTags() && tagsLength > 0) {
341        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
342      }
343      return kvBufSize;
344    }
345
346    public Cell toCell() {
347      Cell ret;
348      int cellBufSize = getCellBufSize();
349      long seqId = 0L;
350      if (includesMvcc()) {
351        seqId = memstoreTS;
352      }
353      if (currentBuffer.hasArray()) {
354        // TODO : reduce the varieties of KV here. Check if based on a boolean
355        // we can handle the 'no tags' case.
356        if (tagsLength > 0) {
357          // TODO : getRow len here.
358          ret = new SizeCachedKeyValue(currentBuffer.array(),
359            currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId, keyLength);
360        } else {
361          ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
362            currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId, keyLength);
363        }
364      } else {
365        currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
366        ByteBuffer buf = tmpPair.getFirst();
367        if (buf.isDirect()) {
368          // TODO : getRow len here.
369          ret = tagsLength > 0
370            ? new SizeCachedByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId,
371              keyLength)
372            : new SizeCachedNoTagsByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId,
373              keyLength);
374        } else {
375          if (tagsLength > 0) {
376            ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset() + tmpPair.getSecond(),
377              cellBufSize, seqId, keyLength);
378          } else {
379            ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset() + tmpPair.getSecond(),
380              cellBufSize, seqId, keyLength);
381          }
382        }
383      }
384      return ret;
385    }
386  }
387}