1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.KeyValueUtil;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.TimeRange;
39 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
40 import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
41
42
43
44
45
46 @InterfaceAudience.LimitedPrivate("Coprocessor")
47 public class StoreFileScanner implements KeyValueScanner {
48 private static final Log LOG = LogFactory.getLog(HStore.class);
49
50
51 private final StoreFile.Reader reader;
52 private final HFileScanner hfs;
53 private Cell cur = null;
54 private boolean closed = false;
55
56 private boolean realSeekDone;
57 private boolean delayedReseek;
58 private Cell delayedSeekKV;
59
60 private boolean enforceMVCC = false;
61 private boolean hasMVCCInfo = false;
62
63
64 private boolean stopSkippingKVsIfNextRow = false;
65
66 private static AtomicLong seekCount;
67
68 private ScanQueryMatcher matcher;
69
70 private long readPt;
71
72
73
74
75
76 public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
77 boolean hasMVCC, long readPt) {
78 this.readPt = readPt;
79 this.reader = reader;
80 this.hfs = hfs;
81 this.enforceMVCC = useMVCC;
82 this.hasMVCCInfo = hasMVCC;
83 }
84
85 boolean isPrimaryReplica() {
86 return reader.isPrimaryReplicaReader();
87 }
88
89
90
91
92
93 public static List<StoreFileScanner> getScannersForStoreFiles(
94 Collection<StoreFile> files,
95 boolean cacheBlocks,
96 boolean usePread, long readPt) throws IOException {
97 return getScannersForStoreFiles(files, cacheBlocks,
98 usePread, false, false, readPt);
99 }
100
101
102
103
104 public static List<StoreFileScanner> getScannersForStoreFiles(
105 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
106 boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
107 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
108 useDropBehind, null, readPt);
109 }
110
111
112
113
114
115
116 public static List<StoreFileScanner> getScannersForStoreFiles(
117 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
118 boolean isCompaction, boolean canUseDrop,
119 ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
120 List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
121 files.size());
122 boolean succ = false;
123 try {
124 for (StoreFile file : files) {
125 StoreFile.Reader r = file.createReader(canUseDrop);
126 r.setReplicaStoreFile(isPrimaryReplica);
127 StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
128 isCompaction, readPt);
129 scanner.setScanQueryMatcher(matcher);
130 scanners.add(scanner);
131 }
132 succ = true;
133 } finally {
134 if (!succ) {
135 for (StoreFileScanner scanner : scanners) {
136 scanner.close();
137 }
138 }
139 }
140 return scanners;
141 }
142
143 public static List<StoreFileScanner> getScannersForStoreFiles(
144 Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
145 boolean isCompaction, boolean canUseDrop,
146 ScanQueryMatcher matcher, long readPt) throws IOException {
147 return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
148 matcher, readPt, true);
149 }
150
151 public String toString() {
152 return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
153 }
154
155 public Cell peek() {
156 return cur;
157 }
158
159 public Cell next() throws IOException {
160 Cell retKey = cur;
161
162 try {
163
164 if (cur != null) {
165 hfs.next();
166 setCurrentCell(hfs.getKeyValue());
167 if (hasMVCCInfo || this.reader.isBulkLoaded()) {
168 skipKVsNewerThanReadpoint();
169 }
170 }
171 } catch (FileNotFoundException e) {
172 throw e;
173 } catch(IOException e) {
174 throw new IOException("Could not iterate " + this, e);
175 }
176 return retKey;
177 }
178
179 public boolean seek(Cell key) throws IOException {
180 if (seekCount != null) seekCount.incrementAndGet();
181
182 try {
183 try {
184 if(!seekAtOrAfter(hfs, key)) {
185 close();
186 return false;
187 }
188
189 setCurrentCell(hfs.getKeyValue());
190
191 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
192 return skipKVsNewerThanReadpoint();
193 } else {
194 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
195 }
196 } finally {
197 realSeekDone = true;
198 }
199 } catch (FileNotFoundException e) {
200 throw e;
201 } catch (IOException ioe) {
202 throw new IOException("Could not seek " + this + " to key " + key, ioe);
203 }
204 }
205
206 public boolean reseek(Cell key) throws IOException {
207 if (seekCount != null) seekCount.incrementAndGet();
208
209 try {
210 try {
211 if (!reseekAtOrAfter(hfs, key)) {
212 close();
213 return false;
214 }
215 setCurrentCell(hfs.getKeyValue());
216
217 if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
218 return skipKVsNewerThanReadpoint();
219 } else {
220 return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
221 }
222 } finally {
223 realSeekDone = true;
224 }
225 } catch (FileNotFoundException e) {
226 throw e;
227 } catch (IOException ioe) {
228 throw new IOException("Could not reseek " + this + " to key " + key,
229 ioe);
230 }
231 }
232
233 protected void setCurrentCell(Cell newVal) throws IOException {
234 this.cur = newVal;
235 if (this.cur != null && this.reader.isBulkLoaded()) {
236 CellUtil.setSequenceId(cur, this.reader.getSequenceID());
237 }
238 }
239
240 protected boolean skipKVsNewerThanReadpoint() throws IOException {
241
242
243 Cell startKV = cur;
244 while(enforceMVCC
245 && cur != null
246 && (cur.getMvccVersion() > readPt)) {
247 hfs.next();
248 setCurrentCell(hfs.getKeyValue());
249 if (this.stopSkippingKVsIfNextRow
250 && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
251 cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
252 startKV.getRowLength()) > 0) {
253 return false;
254 }
255 }
256
257 if (cur == null) {
258 close();
259 return false;
260 }
261
262 return true;
263 }
264
265 public void close() {
266 cur = null;
267 if (closed) return;
268 closed = true;
269 this.hfs.close();
270 }
271
272
273
274
275
276
277
278
279 public static boolean seekAtOrAfter(HFileScanner s, Cell k)
280 throws IOException {
281 int result = s.seekTo(k);
282 if(result < 0) {
283 if (result == HConstants.INDEX_KEY_MAGIC) {
284
285 return true;
286 }
287
288 return s.seekTo();
289 } else if(result > 0) {
290
291
292 return s.next();
293 }
294
295 return true;
296 }
297
298 static boolean reseekAtOrAfter(HFileScanner s, Cell k)
299 throws IOException {
300
301 int result = s.reseekTo(k);
302 if (result <= 0) {
303 if (result == HConstants.INDEX_KEY_MAGIC) {
304
305 return true;
306 }
307
308
309
310 if (!s.isSeeked()) {
311 return s.seekTo();
312 }
313 return true;
314 }
315
316
317 return s.next();
318 }
319
320 @Override
321 public long getSequenceID() {
322 return reader.getSequenceID();
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339 @Override
340 public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
341 throws IOException {
342 if (kv.getFamilyLength() == 0) {
343 useBloom = false;
344 }
345
346 boolean haveToSeek = true;
347 if (useBloom) {
348
349 if (reader.getBloomFilterType() == BloomType.ROWCOL) {
350 haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
351 kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
352 kv.getQualifierOffset(), kv.getQualifierLength());
353 } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
354 ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
355
356
357 haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
358 kv.getRowOffset(), kv.getRowLength());
359 }
360 }
361
362 delayedReseek = forward;
363 delayedSeekKV = kv;
364
365 if (haveToSeek) {
366
367
368 realSeekDone = false;
369 long maxTimestampInFile = reader.getMaxTimestamp();
370 long seekTimestamp = kv.getTimestamp();
371 if (seekTimestamp > maxTimestampInFile) {
372
373
374
375
376
377
378 setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
379 } else {
380
381
382
383
384 enforceSeek();
385 }
386 return cur != null;
387 }
388
389
390
391
392
393
394
395
396 setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
397
398 realSeekDone = true;
399 return true;
400 }
401
402 Reader getReader() {
403 return reader;
404 }
405
406 KeyValue.KVComparator getComparator() {
407 return reader.getComparator();
408 }
409
410 @Override
411 public boolean realSeekDone() {
412 return realSeekDone;
413 }
414
415 @Override
416 public void enforceSeek() throws IOException {
417 if (realSeekDone)
418 return;
419
420 if (delayedReseek) {
421 reseek(delayedSeekKV);
422 } else {
423 seek(delayedSeekKV);
424 }
425 }
426
427 public void setScanQueryMatcher(ScanQueryMatcher matcher) {
428 this.matcher = matcher;
429 }
430
431 @Override
432 public boolean isFileScanner() {
433 return true;
434 }
435
436
437
438 static final long getSeekCount() {
439 return seekCount.get();
440 }
441 static final void instrument() {
442 seekCount = new AtomicLong();
443 }
444
445 @Override
446 public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
447
448 byte[] cf = store.getFamily().getName();
449 TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
450 if (timeRange == null) {
451 timeRange = scan.getTimeRange();
452 }
453 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
454 .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
455 }
456
457 @Override
458 @SuppressWarnings("deprecation")
459 public boolean seekToPreviousRow(Cell originalKey) throws IOException {
460 try {
461 try {
462 boolean keepSeeking = false;
463 Cell key = originalKey;
464 do {
465 KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
466 key.getRowLength());
467 if (seekCount != null) seekCount.incrementAndGet();
468 if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
469 seekKey.getKeyLength())) {
470 close();
471 return false;
472 }
473 KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
474 .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
475
476 if (seekCount != null) seekCount.incrementAndGet();
477 if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
478 close();
479 return false;
480 }
481
482 setCurrentCell(hfs.getKeyValue());
483 this.stopSkippingKVsIfNextRow = true;
484 boolean resultOfSkipKVs;
485 try {
486 resultOfSkipKVs = skipKVsNewerThanReadpoint();
487 } finally {
488 this.stopSkippingKVsIfNextRow = false;
489 }
490 if (!resultOfSkipKVs
491 || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
492 keepSeeking = true;
493 key = firstKeyOfPreviousRow;
494 continue;
495 } else {
496 keepSeeking = false;
497 }
498 } while (keepSeeking);
499 return true;
500 } finally {
501 realSeekDone = true;
502 }
503 } catch (FileNotFoundException e) {
504 throw e;
505 } catch (IOException ioe) {
506 throw new IOException("Could not seekToPreviousRow " + this + " to key "
507 + originalKey, ioe);
508 }
509 }
510
511 @Override
512 public boolean seekToLastRow() throws IOException {
513 byte[] lastRow = reader.getLastRowKey();
514 if (lastRow == null) {
515 return false;
516 }
517 KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
518 if (seek(seekKey)) {
519 return true;
520 } else {
521 return seekToPreviousRow(seekKey);
522 }
523 }
524
525 @Override
526 public boolean backwardSeek(Cell key) throws IOException {
527 seek(key);
528 if (cur == null
529 || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
530 cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
531 key.getRowLength()) > 0) {
532 return seekToPreviousRow(key);
533 }
534 return true;
535 }
536
537 @Override
538 public Cell getNextIndexedKey() {
539 return hfs.getNextIndexedKey();
540 }
541 }