1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.SortedSet;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39 import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40
41
42
43
44
45 @InterfaceAudience.LimitedPrivate("Coprocessor")
46 public class StoreFileScanner implements KeyValueScanner {
47 static final Log LOG = LogFactory.getLog(HStore.class);
48
49
50 private final StoreFile.Reader reader;
51 private final HFileScanner hfs;
52 private Cell cur = null;
53
54 private boolean realSeekDone;
55 private boolean delayedReseek;
56 private Cell delayedSeekKV;
57
58 private boolean enforceMVCC = false;
59 private boolean hasMVCCInfo = false;
60
61
62 private boolean stopSkippingKVsIfNextRow = false;
63
64 private static AtomicLong seekCount;
65
66 private ScanQueryMatcher matcher;
67
68 private long readPt;
69
70
71
72
73
74 public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
75 boolean hasMVCC, long readPt) {
76 this.readPt = readPt;
77 this.reader = reader;
78 this.hfs = hfs;
79 this.enforceMVCC = useMVCC;
80 this.hasMVCCInfo = hasMVCC;
81 }
82
83
84
85
86
87 public static List<StoreFileScanner> getScannersForStoreFiles(
88 Collection<StoreFile> files,
89 boolean cacheBlocks,
90 boolean usePread, long readPt) throws IOException {
91 return getScannersForStoreFiles(files, cacheBlocks,
92 usePread, false, readPt);
93 }
94
95
96
97
98 public static List<StoreFileScanner> getScannersForStoreFiles(
99 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
100 boolean isCompaction, long readPt) throws IOException {
101 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
102 null, readPt);
103 }
104
105
106
107
108
109
110 public static List<StoreFileScanner> getScannersForStoreFiles(
111 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
112 boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
113 List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
114 files.size());
115 for (StoreFile file : files) {
116 StoreFile.Reader r = file.createReader();
117 StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
118 isCompaction, readPt);
119 scanner.setScanQueryMatcher(matcher);
120 scanners.add(scanner);
121 }
122 return scanners;
123 }
124
125 public String toString() {
126 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
127 }
128
129 public Cell peek() {
130 return cur;
131 }
132
133 public Cell next() throws IOException {
134 Cell retKey = cur;
135
136 try {
137
138 if (cur != null) {
139 hfs.next();
140 setCurrentCell(hfs.getKeyValue());
141 if (hasMVCCInfo || this.reader.isBulkLoaded()) {
142 skipKVsNewerThanReadpoint();
143 }
144 }
145 } catch(IOException e) {
146 throw new IOException("Could not iterate " + this, e);
147 }
148 return retKey;
149 }
150
151 public boolean seek(Cell key) throws IOException {
152 if (seekCount != null) seekCount.incrementAndGet();
153
154 try {
155 try {
156 if(!seekAtOrAfter(hfs, key)) {
157 close();
158 return false;
159 }
160
161 setCurrentCell(hfs.getKeyValue());
162
163 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
164 return skipKVsNewerThanReadpoint();
165 } else {
166 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
167 }
168 } finally {
169 realSeekDone = true;
170 }
171 } catch (IOException ioe) {
172 throw new IOException("Could not seek " + this + " to key " + key, ioe);
173 }
174 }
175
176 public boolean reseek(Cell key) throws IOException {
177 if (seekCount != null) seekCount.incrementAndGet();
178
179 try {
180 try {
181 if (!reseekAtOrAfter(hfs, key)) {
182 close();
183 return false;
184 }
185 setCurrentCell(hfs.getKeyValue());
186
187 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
188 return skipKVsNewerThanReadpoint();
189 } else {
190 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
191 }
192 } finally {
193 realSeekDone = true;
194 }
195 } catch (IOException ioe) {
196 throw new IOException("Could not reseek " + this + " to key " + key,
197 ioe);
198 }
199 }
200
201 protected void setCurrentCell(Cell newVal) throws IOException {
202 this.cur = newVal;
203 if (this.cur != null && this.reader.isBulkLoaded()) {
204 CellUtil.setSequenceId(cur, this.reader.getSequenceID());
205 }
206 }
207
208 protected boolean skipKVsNewerThanReadpoint() throws IOException {
209
210
211 Cell startKV = cur;
212 while(enforceMVCC
213 && cur != null
214 && (cur.getMvccVersion() > readPt)) {
215 hfs.next();
216 setCurrentCell(hfs.getKeyValue());
217 if (this.stopSkippingKVsIfNextRow
218 && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
219 cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
220 startKV.getRowLength()) > 0) {
221 return false;
222 }
223 }
224
225 if (cur == null) {
226 close();
227 return false;
228 }
229
230 return true;
231 }
232
233 public void close() {
234
235 cur = null;
236 }
237
238
239
240
241
242
243
244
245 public static boolean seekAtOrAfter(HFileScanner s, Cell k)
246 throws IOException {
247 int result = s.seekTo(k);
248 if(result < 0) {
249 if (result == HConstants.INDEX_KEY_MAGIC) {
250
251 return true;
252 }
253
254 return s.seekTo();
255 } else if(result > 0) {
256
257
258 return s.next();
259 }
260
261 return true;
262 }
263
264 static boolean reseekAtOrAfter(HFileScanner s, Cell k)
265 throws IOException {
266
267 int result = s.reseekTo(k);
268 if (result <= 0) {
269 if (result == HConstants.INDEX_KEY_MAGIC) {
270
271 return true;
272 }
273
274
275
276 if (!s.isSeeked()) {
277 return s.seekTo();
278 }
279 return true;
280 }
281
282
283 return s.next();
284 }
285
286 @Override
287 public long getSequenceID() {
288 return reader.getSequenceID();
289 }
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 @Override
306 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
307 throws IOException {
308 if (kv.getFamilyLength() == 0) {
309 useBloom = false;
310 }
311
312 boolean haveToSeek = true;
313 if (useBloom) {
314
315 if (reader.getBloomFilterType() == BloomType.ROWCOL) {
316 haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
317 kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
318 kv.getQualifierOffset(), kv.getQualifierLength());
319 } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
320 ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
321
322
323 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
324 kv.getRowOffset(), kv.getRowLength());
325 }
326 }
327
328 delayedReseek = forward;
329 delayedSeekKV = kv;
330
331 if (haveToSeek) {
332
333
334 realSeekDone = false;
335 long maxTimestampInFile = reader.getMaxTimestamp();
336 long seekTimestamp = kv.getTimestamp();
337 if (seekTimestamp > maxTimestampInFile) {
338
339
340
341
342
343
344 setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
345 } else {
346
347
348
349
350 enforceSeek();
351 }
352 return cur != null;
353 }
354
355
356
357
358
359
360
361
362 setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
363
364 realSeekDone = true;
365 return true;
366 }
367
368 Reader getReader() {
369 return reader;
370 }
371
372 KeyValue.KVComparator getComparator() {
373 return reader.getComparator();
374 }
375
376 @Override
377 public boolean realSeekDone() {
378 return realSeekDone;
379 }
380
381 @Override
382 public void enforceSeek() throws IOException {
383 if (realSeekDone)
384 return;
385
386 if (delayedReseek) {
387 reseek(delayedSeekKV);
388 } else {
389 seek(delayedSeekKV);
390 }
391 }
392
393 public void setScanQueryMatcher(ScanQueryMatcher matcher) {
394 this.matcher = matcher;
395 }
396
397 @Override
398 public boolean isFileScanner() {
399 return true;
400 }
401
402
403
404 static final long getSeekCount() {
405 return seekCount.get();
406 }
407 static final void instrument() {
408 seekCount = new AtomicLong();
409 }
410
411 @Override
412 public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
413 return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
414 && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
415 }
416
417 @Override
418 @SuppressWarnings("deprecation")
419 public boolean seekToPreviousRow(Cell key) throws IOException {
420 try {
421 try {
422 KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
423 key.getRowLength());
424 if (seekCount != null) seekCount.incrementAndGet();
425 if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
426 seekKey.getKeyLength())) {
427 close();
428 return false;
429 }
430 KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
431 .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
432
433 if (seekCount != null) seekCount.incrementAndGet();
434 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
435 close();
436 return false;
437 }
438
439 setCurrentCell(hfs.getKeyValue());
440 this.stopSkippingKVsIfNextRow = true;
441 boolean resultOfSkipKVs;
442 try {
443 resultOfSkipKVs = skipKVsNewerThanReadpoint();
444 } finally {
445 this.stopSkippingKVsIfNextRow = false;
446 }
447 if (!resultOfSkipKVs
448 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
449 return seekToPreviousRow(firstKeyOfPreviousRow);
450 }
451
452 return true;
453 } finally {
454 realSeekDone = true;
455 }
456 } catch (IOException ioe) {
457 throw new IOException("Could not seekToPreviousRow " + this + " to key "
458 + key, ioe);
459 }
460 }
461
462 @Override
463 public boolean seekToLastRow() throws IOException {
464 byte[] lastRow = reader.getLastRowKey();
465 if (lastRow == null) {
466 return false;
467 }
468 KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
469 if (seek(seekKey)) {
470 return true;
471 } else {
472 return seekToPreviousRow(seekKey);
473 }
474 }
475
476 @Override
477 public boolean backwardSeek(Cell key) throws IOException {
478 seek(key);
479 if (cur == null
480 || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
481 cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
482 key.getRowLength()) > 0) {
483 return seekToPreviousRow(key);
484 }
485 return true;
486 }
487
488 @Override
489 public Cell getNextIndexedKey() {
490 return hfs.getNextIndexedKey();
491 }
492 }