1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.List;
23
24 import org.apache.commons.lang.NotImplementedException;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.KeyValue.KVComparator;
29
30
31
32
33
34
35
36
37 @InterfaceAudience.Private
38 public class ReversedKeyValueHeap extends KeyValueHeap {
39
40
41
42
43
44
45 public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners,
46 KVComparator comparator) throws IOException {
47 super(scanners, new ReversedKVScannerComparator(comparator));
48 }
49
50 @Override
51 public boolean seek(Cell seekKey) throws IOException {
52 throw new IllegalStateException(
53 "seek cannot be called on ReversedKeyValueHeap");
54 }
55
56 @Override
57 public boolean reseek(Cell seekKey) throws IOException {
58 throw new IllegalStateException(
59 "reseek cannot be called on ReversedKeyValueHeap");
60 }
61
62 @Override
63 public boolean requestSeek(Cell key, boolean forward, boolean useBloom)
64 throws IOException {
65 throw new IllegalStateException(
66 "requestSeek cannot be called on ReversedKeyValueHeap");
67 }
68
69 @Override
70 public boolean seekToPreviousRow(Cell seekKey) throws IOException {
71 if (current == null) {
72 return false;
73 }
74 heap.add(current);
75 current = null;
76
77 KeyValueScanner scanner;
78 while ((scanner = heap.poll()) != null) {
79 Cell topKey = scanner.peek();
80 if (comparator.getComparator().compareRows(topKey.getRowArray(),
81 topKey.getRowOffset(), topKey.getRowLength(), seekKey.getRowArray(),
82 seekKey.getRowOffset(), seekKey.getRowLength()) < 0) {
83
84 heap.add(scanner);
85 current = pollRealKV();
86 return current != null;
87 }
88
89 if (!scanner.seekToPreviousRow(seekKey)) {
90 scanner.close();
91 } else {
92 heap.add(scanner);
93 }
94 }
95
96
97 return false;
98 }
99
100 @Override
101 public boolean backwardSeek(Cell seekKey) throws IOException {
102 if (current == null) {
103 return false;
104 }
105 heap.add(current);
106 current = null;
107
108 KeyValueScanner scanner;
109 while ((scanner = heap.poll()) != null) {
110 Cell topKey = scanner.peek();
111 if ((CellUtil.matchingRow(seekKey, topKey) && comparator
112 .getComparator().compare(seekKey, topKey) <= 0)
113 || comparator.getComparator().compareRows(seekKey, topKey) > 0) {
114 heap.add(scanner);
115 current = pollRealKV();
116 return current != null;
117 }
118 if (!scanner.backwardSeek(seekKey)) {
119 scanner.close();
120 } else {
121 heap.add(scanner);
122 }
123 }
124 return false;
125 }
126
127 @Override
128 public Cell next() throws IOException {
129 if (this.current == null) {
130 return null;
131 }
132 Cell kvReturn = this.current.next();
133 Cell kvNext = this.current.peek();
134 if (kvNext == null
135 || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
136 if (this.current.seekToPreviousRow(kvReturn)) {
137 this.heap.add(this.current);
138 } else {
139 this.current.close();
140 }
141 this.current = null;
142 this.current = pollRealKV();
143 } else {
144 KeyValueScanner topScanner = this.heap.peek();
145 if (topScanner != null
146 && this.comparator.compare(this.current, topScanner) > 0) {
147 this.heap.add(this.current);
148 this.current = null;
149 this.current = pollRealKV();
150 }
151 }
152 return kvReturn;
153 }
154
155
156
157
158
159
160 private static class ReversedKVScannerComparator extends
161 KVScannerComparator {
162
163
164
165
166
167 public ReversedKVScannerComparator(KVComparator kvComparator) {
168 super(kvComparator);
169 }
170
171 @Override
172 public int compare(KeyValueScanner left, KeyValueScanner right) {
173 int rowComparison = compareRows(left.peek(), right.peek());
174 if (rowComparison != 0) {
175 return -rowComparison;
176 }
177 return super.compare(left, right);
178 }
179
180
181
182
183
184
185
186 public int compareRows(Cell left, Cell right) {
187 return super.kvComparator.compareRows(left, right);
188 }
189 }
190
191 @Override
192 public boolean seekToLastRow() throws IOException {
193 throw new NotImplementedException("Not implemented");
194 }
195 }