001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.encoding;
018
019import java.nio.ByteBuffer;
020import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
021import org.apache.hadoop.hbase.ByteBufferKeyValue;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellComparator;
024import org.apache.hadoop.hbase.CellUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.SizeCachedKeyValue;
030import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
031import org.apache.hadoop.hbase.io.encoding.AbstractDataBlockEncoder.AbstractEncodedSeeker;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.hbase.util.ByteBufferUtils;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ObjectIntPair;
036import org.apache.yetus.audience.InterfaceAudience;
037
038@InterfaceAudience.Private
039public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
040
041  // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
042  // many object creations.
043  protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
044
045  private ByteBuff currentBuffer;
046  private SeekerState current = new SeekerState(); // always valid
047  private SeekerState previous = new SeekerState(); // may not be valid
048
049  private int rowNumber;
050  private ByteBuff rowOffsets = null;
051  private final CellComparator cellComparator;
052
053  public RowIndexSeekerV1(HFileBlockDecodingContext decodingCtx) {
054    super(decodingCtx);
055    this.cellComparator = decodingCtx.getHFileContext().getCellComparator();
056  }
057
058  @Override
059  public void setCurrentBuffer(ByteBuff buffer) {
060    int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
061
062    // Data part
063    ByteBuff dup = buffer.duplicate();
064    dup.position(buffer.position());
065    dup.limit(buffer.position() + onDiskSize);
066    currentBuffer = dup.slice();
067    current.currentBuffer = currentBuffer;
068    buffer.skip(onDiskSize);
069
070    // Row offset
071    rowNumber = buffer.getInt();
072    int totalRowOffsetsLength = Bytes.SIZEOF_INT * rowNumber;
073    ByteBuff rowDup = buffer.duplicate();
074    rowDup.position(buffer.position());
075    rowDup.limit(buffer.position() + totalRowOffsetsLength);
076    rowOffsets = rowDup.slice();
077
078    decodeFirst();
079  }
080
081  @Override
082  public Cell getKey() {
083    if (current.keyBuffer.hasArray()) {
084      return new KeyValue.KeyOnlyKeyValue(current.keyBuffer.array(),
085          current.keyBuffer.arrayOffset() + current.keyBuffer.position(),
086          current.keyLength);
087    } else {
088      byte[] key = new byte[current.keyLength];
089      ByteBufferUtils.copyFromBufferToArray(key, current.keyBuffer,
090          current.keyBuffer.position(), 0, current.keyLength);
091      return new KeyValue.KeyOnlyKeyValue(key, 0, current.keyLength);
092    }
093  }
094
095  @Override
096  public ByteBuffer getValueShallowCopy() {
097    currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength,
098        tmpPair);
099    ByteBuffer dup = tmpPair.getFirst().duplicate();
100    dup.position(tmpPair.getSecond());
101    dup.limit(tmpPair.getSecond() + current.valueLength);
102    return dup.slice();
103  }
104
105  @Override
106  public Cell getCell() {
107    return current.toCell();
108  }
109
110  @Override
111  public void rewind() {
112    currentBuffer.rewind();
113    decodeFirst();
114  }
115
116  @Override
117  public boolean next() {
118    if (!currentBuffer.hasRemaining()) {
119      return false;
120    }
121    decodeNext();
122    previous.invalidate();
123    return true;
124  }
125
126  private int binarySearch(Cell seekCell, boolean seekBefore) {
127    int low = 0;
128    int high = rowNumber - 1;
129    int mid = low + ((high - low) >> 1);
130    int comp = 0;
131    while (low <= high) {
132      mid = low + ((high - low) >> 1);
133      comp = this.cellComparator.compareRows(getRow(mid), seekCell);
134      if (comp < 0) {
135        low = mid + 1;
136      } else if (comp > 0) {
137        high = mid - 1;
138      } else {
139        // key found
140        if (seekBefore) {
141          return mid - 1;
142        } else {
143          return mid;
144        }
145      }
146    }
147    // key not found.
148    if (comp > 0) {
149      return mid - 1;
150    } else {
151      return mid;
152    }
153  }
154
155  private ByteBuffer getRow(int index) {
156    int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
157    ByteBuff block = currentBuffer.duplicate();
158    block.position(offset + Bytes.SIZEOF_LONG);
159    short rowLen = block.getShort();
160    block.asSubByteBuffer(block.position(), rowLen, tmpPair);
161    ByteBuffer row = tmpPair.getFirst();
162    row.position(tmpPair.getSecond()).limit(tmpPair.getSecond() + rowLen);
163    return row;
164  }
165
166  @Override
167  public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
168    previous.invalidate();
169    int index = binarySearch(seekCell, seekBefore);
170    if (index < 0) {
171      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
172    } else {
173      int offset = rowOffsets.getIntAfterPosition(index * Bytes.SIZEOF_INT);
174      if (offset != 0) {
175        decodeAtPosition(offset);
176      }
177    }
178    do {
179      int comp =
180        PrivateCellUtil.compareKeyIgnoresMvcc(this.cellComparator, seekCell, current.currentKey);
181      if (comp == 0) { // exact match
182        if (seekBefore) {
183          if (!previous.isValid()) {
184            // The caller (seekBefore) has to ensure that we are not at the
185            // first key in the block.
186            throw new IllegalStateException("Cannot seekBefore if "
187                + "positioned at the first key in the block: key="
188                + Bytes.toStringBinary(seekCell.getRowArray()));
189          }
190          moveToPrevious();
191          return 1;
192        }
193        return 0;
194      }
195
196      if (comp < 0) { // already too large, check previous
197        if (previous.isValid()) {
198          moveToPrevious();
199        } else {
200          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
201        }
202        return 1;
203      }
204
205      // move to next, if more data is available
206      if (currentBuffer.hasRemaining()) {
207        previous.copyFromNext(current);
208        decodeNext();
209      } else {
210        break;
211      }
212    } while (true);
213
214    // we hit the end of the block, not an exact match
215    return 1;
216  }
217
218  private void moveToPrevious() {
219    if (!previous.isValid()) {
220      throw new IllegalStateException("Can move back only once and not in first key in the block.");
221    }
222
223    SeekerState tmp = previous;
224    previous = current;
225    current = tmp;
226
227    // move after last key value
228    currentBuffer.position(current.nextKvOffset);
229    previous.invalidate();
230  }
231
232  @Override
233  public int compareKey(CellComparator comparator, Cell key) {
234    return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, current.currentKey);
235  }
236
237  protected void decodeFirst() {
238    decodeNext();
239    previous.invalidate();
240  }
241
242  protected void decodeAtPosition(int position) {
243    currentBuffer.position(position);
244    decodeNext();
245    previous.invalidate();
246  }
247
248  protected void decodeNext() {
249    current.startOffset = currentBuffer.position();
250    long ll = currentBuffer.getLongAfterPosition(0);
251    // Read top half as an int of key length and bottom int as value length
252    current.keyLength = (int) (ll >> Integer.SIZE);
253    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
254    currentBuffer.skip(Bytes.SIZEOF_LONG);
255    // key part
256    currentBuffer.asSubByteBuffer(currentBuffer.position(), current.keyLength,
257        tmpPair);
258    ByteBuffer key = tmpPair.getFirst().duplicate();
259    key.position(tmpPair.getSecond()).limit(
260        tmpPair.getSecond() + current.keyLength);
261    current.keyBuffer = key;
262    currentBuffer.skip(current.keyLength);
263    // value part
264    current.valueOffset = currentBuffer.position();
265    currentBuffer.skip(current.valueLength);
266    if (includesTags()) {
267      decodeTags();
268    }
269    if (includesMvcc()) {
270      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
271    } else {
272      current.memstoreTS = 0;
273    }
274    current.nextKvOffset = currentBuffer.position();
275    current.currentKey.setKey(current.keyBuffer, tmpPair.getSecond(),
276        current.keyLength);
277  }
278
279  protected void decodeTags() {
280    current.tagsLength = currentBuffer.getShortAfterPosition(0);
281    currentBuffer.skip(Bytes.SIZEOF_SHORT);
282    current.tagsOffset = currentBuffer.position();
283    currentBuffer.skip(current.tagsLength);
284  }
285
286  private class SeekerState {
287    /**
288     * The size of a (key length, value length) tuple that prefixes each entry
289     * in a data block.
290     */
291    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
292
293    protected ByteBuff currentBuffer;
294    protected int startOffset = -1;
295    protected int valueOffset = -1;
296    protected int keyLength;
297    protected int valueLength;
298    protected int tagsLength = 0;
299    protected int tagsOffset = -1;
300
301    protected ByteBuffer keyBuffer = null;
302    protected long memstoreTS;
303    protected int nextKvOffset;
304    // buffer backed keyonlyKV
305    private ByteBufferKeyOnlyKeyValue currentKey = new ByteBufferKeyOnlyKeyValue();
306
307    protected boolean isValid() {
308      return valueOffset != -1;
309    }
310
311    protected void invalidate() {
312      valueOffset = -1;
313      currentKey = new ByteBufferKeyOnlyKeyValue();
314      currentBuffer = null;
315    }
316
317    /**
318     * Copy the state from the next one into this instance (the previous state placeholder). Used to
319     * save the previous state when we are advancing the seeker to the next key/value.
320     */
321    protected void copyFromNext(SeekerState nextState) {
322      keyBuffer = nextState.keyBuffer;
323      currentKey.setKey(nextState.keyBuffer,
324          nextState.currentKey.getRowPosition() - Bytes.SIZEOF_SHORT,
325          nextState.keyLength);
326
327      startOffset = nextState.startOffset;
328      valueOffset = nextState.valueOffset;
329      keyLength = nextState.keyLength;
330      valueLength = nextState.valueLength;
331      nextKvOffset = nextState.nextKvOffset;
332      memstoreTS = nextState.memstoreTS;
333      currentBuffer = nextState.currentBuffer;
334      tagsOffset = nextState.tagsOffset;
335      tagsLength = nextState.tagsLength;
336    }
337
338    @Override
339    public String toString() {
340      return CellUtil.getCellKeyAsString(toCell());
341    }
342
343    protected int getCellBufSize() {
344      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
345      if (includesTags() && tagsLength > 0) {
346        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
347      }
348      return kvBufSize;
349    }
350
351    public Cell toCell() {
352      Cell ret;
353      int cellBufSize = getCellBufSize();
354      long seqId = 0L;
355      if (includesMvcc()) {
356        seqId = memstoreTS;
357      }
358      if (currentBuffer.hasArray()) {
359        // TODO : reduce the varieties of KV here. Check if based on a boolean
360        // we can handle the 'no tags' case.
361        if (tagsLength > 0) {
362          ret = new SizeCachedKeyValue(currentBuffer.array(),
363              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
364        } else {
365          ret = new SizeCachedNoTagsKeyValue(currentBuffer.array(),
366              currentBuffer.arrayOffset() + startOffset, cellBufSize, seqId);
367        }
368      } else {
369        currentBuffer.asSubByteBuffer(startOffset, cellBufSize, tmpPair);
370        ByteBuffer buf = tmpPair.getFirst();
371        if (buf.isDirect()) {
372          ret =
373              tagsLength > 0 ? new ByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId)
374                  : new NoTagsByteBufferKeyValue(buf, tmpPair.getSecond(), cellBufSize, seqId);
375        } else {
376          if (tagsLength > 0) {
377            ret = new SizeCachedKeyValue(buf.array(), buf.arrayOffset()
378                + tmpPair.getSecond(), cellBufSize, seqId);
379          } else {
380            ret = new SizeCachedNoTagsKeyValue(buf.array(), buf.arrayOffset()
381                + tmpPair.getSecond(), cellBufSize, seqId);
382          }
383        }
384      }
385      return ret;
386    }
387  }
388}