001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.commons.lang3.NotImplementedException;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellComparator;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with KeyValueHeap, its
031 * scanner comparator is a little different (see ReversedKVScannerComparator), all seek is backward
032 * seek(see {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row if it is
033 * already at the end of one row when calling next().
034 */
035@InterfaceAudience.Private
036public class ReversedKeyValueHeap extends KeyValueHeap {
037
038  /**
039   * nnn
040   */
041  public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
042    throws IOException {
043    super(scanners, new ReversedKVScannerComparator(comparator));
044  }
045
046  @Override
047  public boolean seek(Cell seekKey) throws IOException {
048    throw new IllegalStateException("seek cannot be called on ReversedKeyValueHeap");
049  }
050
051  @Override
052  public boolean reseek(Cell seekKey) throws IOException {
053    throw new IllegalStateException("reseek cannot be called on ReversedKeyValueHeap");
054  }
055
056  @Override
057  public boolean requestSeek(Cell key, boolean forward, boolean useBloom) throws IOException {
058    throw new IllegalStateException("requestSeek cannot be called on ReversedKeyValueHeap");
059  }
060
061  @Override
062  public boolean seekToPreviousRow(Cell seekKey) throws IOException {
063    if (current == null) {
064      return false;
065    }
066    heap.add(current);
067    current = null;
068
069    KeyValueScanner scanner;
070    while ((scanner = heap.poll()) != null) {
071      Cell topKey = scanner.peek();
072      if (comparator.getComparator().compareRows(topKey, seekKey) < 0) {
073        // Row of Top KeyValue is before Seek row.
074        heap.add(scanner);
075        current = pollRealKV();
076        return current != null;
077      }
078
079      if (!scanner.seekToPreviousRow(seekKey)) {
080        this.scannersForDelayedClose.add(scanner);
081      } else {
082        heap.add(scanner);
083      }
084    }
085
086    // Heap is returning empty, scanner is done
087    return false;
088  }
089
090  @Override
091  public boolean backwardSeek(Cell seekKey) throws IOException {
092    if (current == null) {
093      return false;
094    }
095    heap.add(current);
096    current = null;
097
098    KeyValueScanner scanner;
099    while ((scanner = heap.poll()) != null) {
100      Cell topKey = scanner.peek();
101      if (
102        (CellUtil.matchingRows(seekKey, topKey)
103          && comparator.getComparator().compare(seekKey, topKey) <= 0)
104          || comparator.getComparator().compareRows(seekKey, topKey) > 0
105      ) {
106        heap.add(scanner);
107        current = pollRealKV();
108        return current != null;
109      }
110      if (!scanner.backwardSeek(seekKey)) {
111        this.scannersForDelayedClose.add(scanner);
112      } else {
113        heap.add(scanner);
114      }
115    }
116    return false;
117  }
118
119  @Override
120  public Cell next() throws IOException {
121    if (this.current == null) {
122      return null;
123    }
124    Cell kvReturn = this.current.next();
125    Cell kvNext = this.current.peek();
126    if (kvNext == null || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
127      if (this.current.seekToPreviousRow(kvReturn)) {
128        this.heap.add(this.current);
129      } else {
130        this.scannersForDelayedClose.add(this.current);
131      }
132      this.current = null;
133      this.current = pollRealKV();
134    } else {
135      KeyValueScanner topScanner = this.heap.peek();
136      if (topScanner != null && this.comparator.compare(this.current, topScanner) > 0) {
137        this.heap.add(this.current);
138        this.current = null;
139        this.current = pollRealKV();
140      }
141    }
142    return kvReturn;
143  }
144
145  /**
146   * In ReversedKVScannerComparator, we compare the row of scanners' peek values first, sort bigger
147   * one before the smaller one. Then compare the KeyValue if they have the equal row, sort smaller
148   * one before the bigger one
149   */
150  private static class ReversedKVScannerComparator extends KVScannerComparator {
151
152    /**
153     * Constructor n
154     */
155    public ReversedKVScannerComparator(CellComparator kvComparator) {
156      super(kvComparator);
157    }
158
159    @Override
160    public int compare(KeyValueScanner left, KeyValueScanner right) {
161      int rowComparison = compareRows(left.peek(), right.peek());
162      if (rowComparison != 0) {
163        return -rowComparison;
164      }
165      return super.compare(left, right);
166    }
167
168    /**
169     * Compares rows of two KeyValue nn * @return less than 0 if left is smaller, 0 if equal etc..
170     */
171    public int compareRows(Cell left, Cell right) {
172      return super.kvComparator.compareRows(left, right);
173    }
174  }
175
176  @Override
177  public boolean seekToLastRow() throws IOException {
178    throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
179  }
180}