001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.nio.ByteBuffer;
021import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellComparator;
024import org.apache.hadoop.hbase.CellUtil;
025import org.apache.hadoop.hbase.ExtendedCell;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.SizeCachedByteBufferKeyValue;
030import org.apache.hadoop.hbase.SizeCachedKeyValue;
031import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
032import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
033import org.apache.hadoop.hbase.io.encoding.AbstractDataBlockEncoder.AbstractEncodedSeeker;
034import org.apache.hadoop.hbase.nio.ByteBuff;
035import org.apache.hadoop.hbase.util.ByteBufferUtils;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.ObjectIntPair;
038import org.apache.yetus.audience.InterfaceAudience;
039
040@InterfaceAudience.Private
041public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
042
043  // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
044  // many object creations.
045  protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
046
047  private ByteBuff currentBuffer;
048  private SeekerState current = new SeekerState(); // always valid
049  private SeekerState previous = new SeekerState(); // may not be valid
050
051  private int rowNumber;
052  private ByteBuff rowOffsets = null;
053  private final CellComparator cellComparator;
054
055  public RowIndexSeekerV1(HFileBlockDecodingContext decodingCtx) {
056    super(decodingCtx);
057    this.cellComparator = decodingCtx.getHFileContext().getCellComparator();
058  }
059
060  @Override
061  public void setCurrentBuffer(ByteBuff buffer) {
062    int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
063
064    // Data part
065    ByteBuff dup = buffer.duplicate();
066    dup.position(buffer.position());
067    dup.limit(buffer.position() + onDiskSize);
068    currentBuffer = dup.slice();
069    current.currentBuffer = currentBuffer;
070    buffer.skip(onDiskSize);
071
072    // Row offset
073    rowNumber = buffer.getInt();
074    int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
075    ByteBuff rowDup = buffer.duplicate();
076    rowDup.position(buffer.position());
077    rowDup.limit(buffer.position() + totalRowOffsetsLength);
078    rowOffsets = rowDup.slice();
079
080    decodeFirst();
081  }
082
083  @Override
084  @SuppressWarnings("ByteBufferBackingArray")
085  public ExtendedCell getKey() {
086    if (current.keyBuffer.hasArray()) {
087      return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
088        current.keyBuffer.arrayOffset() + current.keyBuffer.position(), current.keyLength);
089    } else {
090      final byte[] key = new byte[current.keyLength];
091      ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer, current.keyBuffer.position(), 0,
092        current.keyLength);
093      return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
094    }
095  }
096
097  @Override
098  public ByteBuffer getValueShallowCopy() {
099    currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
100    ByteBuffer dup = tmpPair.getFirst().duplicate();
101    dup.position(tmpPair.getSecond());
102    dup.limit(tmpPair.getSecond() + current.valueLength);
103    return dup.slice();
104  }
105
106  @Override
107  public ExtendedCell getCell() {
108    return current.toCell();
109  }
110
111  @Override
112  public void rewind() {
113    currentBuffer.rewind();
114    decodeFirst();
115  }
116
117  @Override
118  public boolean next() {
119    if (!currentBuffer.hasRemaining()) {
120      return false;
121    }
122    decodeNext();
123    previous.invalidate();
124    return true;
125  }
126
127  private int binarySearch(Cell seekCell, boolean seekBefore) {
128    int low = 0;
129    int high = rowNumber - 1;
130    int mid = low + ((high - low) >> 1);
131    int comp = 0;
132    while (low <= high) {
133      mid = low + ((high - low) >> 1);
134      comp = this.cellComparator.compareRows(getRow(mid), seekCell);
135      if (comp < 0) {
136        low = mid + 1;
137      } else if (comp > 0) {
138        high = mid - 1;
139      } else {
140        // key found
141        if (seekBefore) {
142          return mid - 1;
143        } else {
144          return mid;
145        }
146      }
147    }
148    // key not found.
149    if (comp > 0) {
150      return mid - 1;
151    } else {
152      return mid;
153    }
154  }
155
156  private ByteBuffer getRow(int index) {
157    int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
158    ByteBuff block = currentBuffer.duplicate();
159    block.position(offset + Bytes.SIZEOF_LONG);
160    short rowLen = block.getShort();
161    block.asSubByteBuffer(block.position(), rowLen, tmpPair);
162    ByteBuffer row = tmpPair.getFirst();
163    row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
164    return row;
165  }
166
167  @Override
168  public int seekToKeyInBlock(ExtendedCell seekCell, boolean seekBefore) {
169    previous.invalidate();
170    int index = binarySearch(seekCell, seekBefore);
171    if (index < 0) {
172      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
173    } else {
174      int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
175      if (offset != 0) {
176        decodeAtPosition(offset);
177      }
178    }
179    do {
180      int comp =
181        PrivateCellUtil.compareKeyIgnoresMvcc(this.cellComparator, seekCell, current.currentKey);
182      if (comp == 0) { // exact match
183        if (seekBefore) {
184          if (!previous.isValid()) {
185            // The caller (seekBefore) has to ensure that we are not at the
186            // first key in the block.
187            throw new IllegalStateException(
188              "Cannot seekBefore if " + "positioned at the first key in the block: key="
189                + Bytes.toStringBinary(seekCell.getRowArray()));
190          }
191          moveToPrevious();
192          return 1;
193        }
194        return 0;
195      }
196
197      if (comp < 0) { // already too large, check previous
198        if (previous.isValid()) {
199          moveToPrevious();
200        } else {
201          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
202        }
203        return 1;
204      }
205
206      // move to next, if more data is available
207      if (currentBuffer.hasRemaining()) {
208        previous.copyFromNext(current);
209        decodeNext();
210      } else {
211        break;
212      }
213    } while (true);
214
215    // we hit the end of the block, not an exact match
216    return 1;
217  }
218
219  private void moveToPrevious() {
220    if (!previous.isValid()) {
221      throw new IllegalStateException("Can move back only once and not in first key in the block.");
222    }
223
224    SeekerState tmp = previous;
225    previous = current;
226    current = tmp;
227
228    // move after last key value
229    currentBuffer.position(current.nextKvOffset);
230    previous.invalidate();
231  }
232
233  @Override
234  public int compareKey(CellComparator comparator, ExtendedCell key) {
235    return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, current.currentKey);
236  }
237
238  protected void decodeFirst() {
239    decodeNext();
240    previous.invalidate();
241  }
242
243  protected void decodeAtPosition(int position) {
244    currentBuffer.position(position);
245    decodeNext();
246    previous.invalidate();
247  }
248
249  protected void decodeNext() {
250    current.startOffset = currentBuffer.position();
251    long ll = currentBuffer.getLongAfterPosition(0);
252    // Read top half as an int of key length and bottom int as value length
253    current.keyLength = (int) (ll >> Integer.SIZE);
254    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
255    currentBuffer.skip(Bytes.SIZEOF_LONG);
256    // key part
257    currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength, tmpPair);
258    ByteBuffer key = tmpPair.getFirst().duplicate();
259    key.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + current.keyLength);
260    current.keyBuffer = key;
261    currentBuffer.skip(current.keyLength);
262    // value part
263    current.valueOffset = currentBuffer.position();
264    currentBuffer.skip(current.valueLength);
265    if (includesTags()) {
266      decodeTags();
267    }
268    if (includesMvcc()) {
269      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
270    } else {
271      current.memstoreTS = 0;
272    }
273    current.nextKvOffset = currentBuffer.position();
274    current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(), current.keyLength);
275  }
276
277  protected void decodeTags() {
278    current.tagsLength = currentBuffer.getShortAfterPosition(0);
279    currentBuffer.skip(Bytes.SIZEOF_SHORT);
280    current.tagsOffset = currentBuffer.position();
281    currentBuffer.skip(current.tagsLength);
282  }
283
284  private class SeekerState {
285    /**
286     * The size of a (key length, value length) tuple that prefixes each entry in a data block.
287     */
288    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
289
290    protected ByteBuff currentBuffer;
291    protected int startOffset = -1;
292    protected int valueOffset = -1;
293    protected int keyLength;
294    protected int valueLength;
295    protected int tagsLength = 0;
296    protected int tagsOffset = -1;
297
298    protected ByteBuffer keyBuffer = null;
299    protected long memstoreTS;
300    protected int nextKvOffset;
301    // buffer backed keyonlyKV
302    private ByteBufferKeyOnlyKeyValue currentKey = new ByteBufferKeyOnlyKeyValue();
303
304    protected boolean isValid() {
305      return valueOffset != -1;
306    }
307
308    protected void invalidate() {
309      valueOffset = -1;
310      currentKey = new ByteBufferKeyOnlyKeyValue();
311      currentBuffer = null;
312    }
313
314    /**
315     * Copy the state from the next one into this instance (the previous state placeholder). Used to
316     * save the previous state when we are advancing the seeker to the next key/value.
317     */
318    protected void copyFromNext(SeekerState nextState) {
319      keyBuffer = nextState.keyBuffer;
320      currentKey.setKey(nextState.keyBuffer,
321        nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT, nextState.keyLength);
322
323      startOffset = nextState.startOffset;
324      valueOffset = nextState.valueOffset;
325      keyLength = nextState.keyLength;
326      valueLength = nextState.valueLength;
327      nextKvOffset = nextState.nextKvOffset;
328      memstoreTS = nextState.memstoreTS;
329      currentBuffer = nextState.currentBuffer;
330      tagsOffset = nextState.tagsOffset;
331      tagsLength = nextState.tagsLength;
332    }
333
334    @Override
335    public String toString() {
336      return CellUtil.getCellKeyAsString(toCell());
337    }
338
339    protected int getCellBufSize() {
340      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
341      if (includesTags() && tagsLength > 0) {
342        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
343      }
344      return kvBufSize;
345    }
346
347    public ExtendedCell toCell() {
348      ExtendedCell ret;
349      int cellBufSize = getCellBufSize();
350      long seqId = 0L;
351      if (includesMvcc()) {
352        seqId = memstoreTS;
353      }
354      if (currentBuffer.hasArray()) {
355        // TODO : reduce the varieties of KV here. Check if based on a boolean
356        // we can handle the 'no tags' case.
357        if (tagsLength > 0) {
358          // TODO : getRow len here.
359          ret = new SizeCachedKeyValue(currentBuffer.array(),
360            currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId, keyLength);
361        } else {
362          ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
363            currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId, keyLength);
364        }
365      } else {
366        currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
367        ByteBuffer buf = tmpPair.getFirst();
368        if (buf.isDirect()) {
369          // TODO : getRow len here.
370          ret = tagsLength > 0
371            ? new SizeCachedByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId,
372              keyLength)
373            : new SizeCachedNoTagsByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId,
374              keyLength);
375        } else {
376          if (tagsLength > 0) {
377            ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset() + tmpPair.getSecond(),
378              cellBufSize, seqId, keyLength);
379          } else {
380            ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset() + tmpPair.getSecond(),
381              cellBufSize, seqId, keyLength);
382          }
383        }
384      }
385      return ret;
386    }
387  }
388}