001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.commons.lang3.NotImplementedException;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellComparator;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with KeyValueHeap, its
031 * scanner comparator is a little different (see ReversedKVScannerComparator), all seek is backward
032 * seek(see {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row if it is
033 * already at the end of one row when calling next().
034 */
035@InterfaceAudience.Private
036public class ReversedKeyValueHeap extends KeyValueHeap {
037
038  /**
039   *   */
040  public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
041    throws IOException {
042    super(scanners, new ReversedKVScannerComparator(comparator));
043  }
044
045  @Override
046  public boolean seek(Cell seekKey) throws IOException {
047    throw new IllegalStateException("seek cannot be called on ReversedKeyValueHeap");
048  }
049
050  @Override
051  public boolean reseek(Cell seekKey) throws IOException {
052    throw new IllegalStateException("reseek cannot be called on ReversedKeyValueHeap");
053  }
054
055  @Override
056  public boolean requestSeek(Cell key, boolean forward, boolean useBloom) throws IOException {
057    throw new IllegalStateException("requestSeek cannot be called on ReversedKeyValueHeap");
058  }
059
060  @Override
061  public boolean seekToPreviousRow(Cell seekKey) throws IOException {
062    if (current == null) {
063      return false;
064    }
065    heap.add(current);
066    current = null;
067
068    KeyValueScanner scanner;
069    while ((scanner = heap.poll()) != null) {
070      Cell topKey = scanner.peek();
071      if (comparator.getComparator().compareRows(topKey, seekKey) < 0) {
072        // Row of Top KeyValue is before Seek row.
073        heap.add(scanner);
074        current = pollRealKV();
075        return current != null;
076      }
077
078      if (!scanner.seekToPreviousRow(seekKey)) {
079        this.scannersForDelayedClose.add(scanner);
080      } else {
081        heap.add(scanner);
082      }
083    }
084
085    // Heap is returning empty, scanner is done
086    return false;
087  }
088
089  @Override
090  public boolean backwardSeek(Cell seekKey) throws IOException {
091    if (current == null) {
092      return false;
093    }
094    heap.add(current);
095    current = null;
096
097    KeyValueScanner scanner;
098    while ((scanner = heap.poll()) != null) {
099      Cell topKey = scanner.peek();
100      if (
101        (CellUtil.matchingRows(seekKey, topKey)
102          && comparator.getComparator().compare(seekKey, topKey) <= 0)
103          || comparator.getComparator().compareRows(seekKey, topKey) > 0
104      ) {
105        heap.add(scanner);
106        current = pollRealKV();
107        return current != null;
108      }
109      if (!scanner.backwardSeek(seekKey)) {
110        this.scannersForDelayedClose.add(scanner);
111      } else {
112        heap.add(scanner);
113      }
114    }
115    return false;
116  }
117
118  @Override
119  public Cell next() throws IOException {
120    if (this.current == null) {
121      return null;
122    }
123    Cell kvReturn = this.current.next();
124    Cell kvNext = this.current.peek();
125    if (kvNext == null || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
126      if (this.current.seekToPreviousRow(kvReturn)) {
127        this.heap.add(this.current);
128      } else {
129        this.scannersForDelayedClose.add(this.current);
130      }
131      this.current = null;
132      this.current = pollRealKV();
133    } else {
134      KeyValueScanner topScanner = this.heap.peek();
135      if (topScanner != null && this.comparator.compare(this.current, topScanner) > 0) {
136        this.heap.add(this.current);
137        this.current = null;
138        this.current = pollRealKV();
139      }
140    }
141    return kvReturn;
142  }
143
144  /**
145   * In ReversedKVScannerComparator, we compare the row of scanners' peek values first, sort bigger
146   * one before the smaller one. Then compare the KeyValue if they have the equal row, sort smaller
147   * one before the bigger one
148   */
149  private static class ReversedKVScannerComparator extends KVScannerComparator {
150
151    /**
152     * Constructor
153     */
154    public ReversedKVScannerComparator(CellComparator kvComparator) {
155      super(kvComparator);
156    }
157
158    @Override
159    public int compare(KeyValueScanner left, KeyValueScanner right) {
160      int rowComparison = compareRows(left.peek(), right.peek());
161      if (rowComparison != 0) {
162        return -rowComparison;
163      }
164      return super.compare(left, right);
165    }
166
167    /**
168     * Compares rows of two KeyValue
169     * @return less than 0 if left is smaller, 0 if equal etc..
170     */
171    public int compareRows(Cell left, Cell right) {
172      return super.kvComparator.compareRows(left, right);
173    }
174  }
175
176  @Override
177  public boolean seekToLastRow() throws IOException {
178    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
179  }
180}