1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.util.List;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
28
29 /**
30 * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
31 * invocations of {@link InternalScanner#next(java.util.List)} and
32 * {@link RegionScanner#next(java.util.List)}.
33 * <p>
34 * A ScannerContext instance should be updated periodically throughout execution whenever progress
35 * towards a limit has been made. Each limit can be checked via the appropriate checkLimit method.
36 * <p>
37 * Once a limit has been reached, the scan will stop. The invoker of
38 * {@link InternalScanner#next(java.util.List)} or {@link RegionScanner#next(java.util.List)} can
39 * use the appropriate check*Limit methods to see exactly which limits have been reached.
40 * Alternatively, {@link #checkAnyLimitReached(LimitScope)} is provided to see if ANY limit was
41 * reached
42 * <p>
43 * {@link NoLimitScannerContext#NO_LIMIT} is an immutable static definition that can be used
44 * whenever a {@link ScannerContext} is needed but limits do not need to be enforced.
45 * <p>
46 * NOTE: It is important that this class only ever expose setter methods that can be safely skipped
47 * when limits should be NOT enforced. This is because of the necessary immutability of the class
48 * {@link NoLimitScannerContext}. If a setter cannot be safely skipped, the immutable nature of
49 * {@link NoLimitScannerContext} will lead to incorrect behavior.
50 */
51 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
52 @InterfaceStability.Evolving
53 public class ScannerContext {
54 private static final Log LOG = LogFactory.getLog(ScannerContext.class);
55
56 /**
57 * Two sets of the same fields. One for the limits, another for the progress towards those limits
58 */
59 LimitFields limits;
60 LimitFields progress;
61
62 /**
63 * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
64 * or {@link RegionScanner#next(java.util.List)}.
65 */
66 NextState scannerState;
67 private static final NextState DEFAULT_STATE = NextState.MORE_VALUES;
68
69 /**
70 * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and
71 * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this
72 * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for
73 * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single
74 * invocation, but any progress made should be considered in future invocations)
75 * <p>
76 * Defaulting this value to false means that, by default, any tracked progress will be wiped clean
77 * on invocations to {@link InternalScanner#next(java.util.List)} and
78 * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress
79 * has been made towards the limits so far.
80 * <p>
81 * This is an important mechanism. Users of Internal/Region scanners expect that they can define
82 * some limits and then repeatedly invoke {@link InternalScanner#next(List)} or
83 * {@link RegionScanner#next(List)} where each invocation respects these limits separately.
84 * <p>
85 * For example: <code><pre>
86 * ScannerContext context = new ScannerContext.newBuilder().setBatchLimit(5).build();
87 * RegionScanner scanner = ...
88 * List<Cell> results = new ArrayList<Cell>();
89 * while(scanner.next(results, context)) {
90 * // Do something with a batch of 5 cells
91 * }
92 * </pre></code> However, in the case of RPCs, the server wants to be able to define a set of
93 * limits for a particular RPC request and have those limits respected across multiple
94 * invocations. This means that the progress made towards the limits in earlier calls will be
95 * saved and considered in future invocations
96 */
97 boolean keepProgress;
98 private static boolean DEFAULT_KEEP_PROGRESS = false;
99
100 /**
101 * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
102 */
103 final ServerSideScanMetrics metrics;
104
105 ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
106 this.limits = new LimitFields();
107 if (limitsToCopy != null) this.limits.copy(limitsToCopy);
108
109 // Progress fields are initialized to 0
110 progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
111
112 this.keepProgress = keepProgress;
113 this.scannerState = DEFAULT_STATE;
114 this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
115 }
116
117 boolean isTrackingMetrics() {
118 return this.metrics != null;
119 }
120
121 /**
122 * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
123 * has been made to confirm that metrics are indeed being tracked.
124 * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
125 */
126 ServerSideScanMetrics getMetrics() {
127 assert isTrackingMetrics();
128 return this.metrics;
129 }
130
131 /**
132 * @return true if the progress tracked so far in this instance will be considered during an
133 * invocation of {@link InternalScanner#next(java.util.List)} or
134 * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
135 * should not be considered and should instead be wiped away via {@link #clearProgress()}
136 */
137 boolean getKeepProgress() {
138 return keepProgress;
139 }
140
141 void setKeepProgress(boolean keepProgress) {
142 this.keepProgress = keepProgress;
143 }
144
145 /**
146 * Progress towards the batch limit has been made. Increment internal tracking of batch progress
147 */
148 void incrementBatchProgress(int batch) {
149 int currentBatch = progress.getBatch();
150 progress.setBatch(currentBatch + batch);
151 }
152
153 /**
154 * Progress towards the size limit has been made. Increment internal tracking of size progress
155 */
156 void incrementSizeProgress(long size) {
157 long currentSize = progress.getSize();
158 progress.setSize(currentSize + size);
159 }
160
161 /**
162 * Update the time progress with {@link System#currentTimeMillis()}
163 */
164 void updateTimeProgress() {
165 progress.setTime(System.currentTimeMillis());
166 }
167
168 int getBatchProgress() {
169 return progress.getBatch();
170 }
171
172 long getSizeProgress() {
173 return progress.getSize();
174 }
175
176 long getTimeProgress() {
177 return progress.getTime();
178 }
179
180 void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
181 setBatchProgress(batchProgress);
182 setSizeProgress(sizeProgress);
183 setTimeProgress(timeProgress);
184 }
185
186 void setSizeProgress(long sizeProgress) {
187 progress.setSize(sizeProgress);
188 }
189
190 void setBatchProgress(int batchProgress) {
191 progress.setBatch(batchProgress);
192 }
193
194 void setTimeProgress(long timeProgress) {
195 progress.setTime(timeProgress);
196 }
197
198 /**
199 * Clear away any progress that has been made so far. All progress fields are reset to initial
200 * values
201 */
202 void clearProgress() {
203 progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
204 }
205
206 /**
207 * Note that this is not a typical setter. This setter returns the {@link NextState} that was
208 * passed in so that methods can be invoked against the new state. Furthermore, this pattern
209 * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
210 * new state, thus preserving the immutability of {@link NoLimitScannerContext}
211 * @param state
212 * @return The state that was passed in.
213 */
214 NextState setScannerState(NextState state) {
215 if (!NextState.isValidState(state)) {
216 throw new IllegalArgumentException("Cannot set to invalid state: " + state);
217 }
218
219 this.scannerState = state;
220 return state;
221 }
222
223 /**
224 * @return true when a partial result is formed. A partial result is formed when a limit is
225 * reached in the middle of a row.
226 */
227 boolean partialResultFormed() {
228 return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
229 || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
230 }
231
232 /**
233 * @return true when a mid-row result is formed.
234 */
235 boolean midRowResultFormed() {
236 return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
237 || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW
238 || scannerState == NextState.BATCH_LIMIT_REACHED;
239 }
240
241 /**
242 * @param checkerScope
243 * @return true if the batch limit can be enforced in the checker's scope
244 */
245 boolean hasBatchLimit(LimitScope checkerScope) {
246 return limits.canEnforceBatchLimitFromScope(checkerScope) && limits.getBatch() > 0;
247 }
248
249 /**
250 * @param checkerScope
251 * @return true if the size limit can be enforced in the checker's scope
252 */
253 boolean hasSizeLimit(LimitScope checkerScope) {
254 return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
255 }
256
257 /**
258 * @param checkerScope
259 * @return true if the time limit can be enforced in the checker's scope
260 */
261 boolean hasTimeLimit(LimitScope checkerScope) {
262 return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
263 }
264
265 /**
266 * @param checkerScope
267 * @return true if any limit can be enforced within the checker's scope
268 */
269 boolean hasAnyLimit(LimitScope checkerScope) {
270 return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
271 }
272
273 /**
274 * @param scope The scope in which the size limit will be enforced
275 */
276 void setSizeLimitScope(LimitScope scope) {
277 limits.setSizeScope(scope);
278 }
279
280 /**
281 * @param scope The scope in which the time limit will be enforced
282 */
283 void setTimeLimitScope(LimitScope scope) {
284 limits.setTimeScope(scope);
285 }
286
287 int getBatchLimit() {
288 return limits.getBatch();
289 }
290
291 long getSizeLimit() {
292 return limits.getSize();
293 }
294
295 long getTimeLimit() {
296 return limits.getTime();
297 }
298
299 /**
300 * @param checkerScope The scope that the limit is being checked from
301 * @return true when the limit is enforceable from the checker's scope and it has been reached
302 */
303 boolean checkBatchLimit(LimitScope checkerScope) {
304 return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
305 }
306
307 /**
308 * @param checkerScope The scope that the limit is being checked from
309 * @return true when the limit is enforceable from the checker's scope and it has been reached
310 */
311 boolean checkSizeLimit(LimitScope checkerScope) {
312 return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
313 }
314
315 /**
316 * @param checkerScope The scope that the limit is being checked from. The time limit is always
317 * checked against {@link System#currentTimeMillis()}
318 * @return true when the limit is enforceable from the checker's scope and it has been reached
319 */
320 boolean checkTimeLimit(LimitScope checkerScope) {
321 return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
322 }
323
324 /**
325 * @param checkerScope The scope that the limits are being checked from
326 * @return true when some limit is enforceable from the checker's scope and it has been reached
327 */
328 boolean checkAnyLimitReached(LimitScope checkerScope) {
329 return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
330 || checkTimeLimit(checkerScope);
331 }
332
333 @Override
334 public String toString() {
335 StringBuilder sb = new StringBuilder();
336 sb.append("{");
337
338 sb.append("limits:");
339 sb.append(limits);
340
341 sb.append(", progress:");
342 sb.append(progress);
343
344 sb.append(", keepProgress:");
345 sb.append(keepProgress);
346
347 sb.append(", state:");
348 sb.append(scannerState);
349
350 sb.append("}");
351 return sb.toString();
352 }
353
354 public static Builder newBuilder() {
355 return new Builder();
356 }
357
358 public static Builder newBuilder(boolean keepProgress) {
359 return new Builder(keepProgress);
360 }
361
362 public static final class Builder {
363 boolean keepProgress = DEFAULT_KEEP_PROGRESS;
364 boolean trackMetrics = false;
365 LimitFields limits = new LimitFields();
366
367 private Builder() {
368 }
369
370 private Builder(boolean keepProgress) {
371 this.keepProgress = keepProgress;
372 }
373
374 public Builder setKeepProgress(boolean keepProgress) {
375 this.keepProgress = keepProgress;
376 return this;
377 }
378
379 public Builder setTrackMetrics(boolean trackMetrics) {
380 this.trackMetrics = trackMetrics;
381 return this;
382 }
383
384 public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
385 limits.setSize(sizeLimit);
386 limits.setSizeScope(sizeScope);
387 return this;
388 }
389
390 public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
391 limits.setTime(timeLimit);
392 limits.setTimeScope(timeScope);
393 return this;
394 }
395
396 public Builder setBatchLimit(int batchLimit) {
397 limits.setBatch(batchLimit);
398 return this;
399 }
400
401 public ScannerContext build() {
402 return new ScannerContext(keepProgress, limits, trackMetrics);
403 }
404 }
405
406 /**
407 * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)}
408 */
409 public enum NextState {
410 MORE_VALUES(true, false),
411 NO_MORE_VALUES(false, false),
412 SIZE_LIMIT_REACHED(true, true),
413
414 /**
415 * Special case of size limit reached to indicate that the size limit was reached in the middle
416 * of a row and thus a partial results was formed
417 */
418 SIZE_LIMIT_REACHED_MID_ROW(true, true),
419 TIME_LIMIT_REACHED(true, true),
420
421 /**
422 * Special case of time limit reached to indicate that the time limit was reached in the middle
423 * of a row and thus a partial results was formed
424 */
425 TIME_LIMIT_REACHED_MID_ROW(true, true),
426 BATCH_LIMIT_REACHED(true, true);
427
428 private boolean moreValues;
429 private boolean limitReached;
430
431 private NextState(boolean moreValues, boolean limitReached) {
432 this.moreValues = moreValues;
433 this.limitReached = limitReached;
434 }
435
436 /**
437 * @return true when the state indicates that more values may follow those that have been
438 * returned
439 */
440 public boolean hasMoreValues() {
441 return this.moreValues;
442 }
443
444 /**
445 * @return true when the state indicates that a limit has been reached and scan should stop
446 */
447 public boolean limitReached() {
448 return this.limitReached;
449 }
450
451 public static boolean isValidState(NextState state) {
452 return state != null;
453 }
454
455 public static boolean hasMoreValues(NextState state) {
456 return isValidState(state) && state.hasMoreValues();
457 }
458 }
459
460 /**
461 * The various scopes where a limit can be enforced. Used to differentiate when a limit should be
462 * enforced or not.
463 */
464 public enum LimitScope {
465 /**
466 * Enforcing a limit between rows means that the limit will not be considered until all the
467 * cells for a particular row have been retrieved
468 */
469 BETWEEN_ROWS(0),
470
471 /**
472 * Enforcing a limit between cells means that the limit will be considered after each full cell
473 * has been retrieved
474 */
475 BETWEEN_CELLS(1);
476
477 /**
478 * When enforcing a limit, we must check that the scope is appropriate for enforcement.
479 * <p>
480 * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of
481 * the checker's scope is less than or equal to the limit's scope. This means that when checking
482 * limits, the checker must know their own scope (i.e. are they checking the limits between
483 * rows, between cells, etc...)
484 */
485 int depth;
486
487 LimitScope(int depth) {
488 this.depth = depth;
489 }
490
491 int depth() {
492 return depth;
493 }
494
495 /**
496 * @param checkerScope The scope in which the limit is being checked
497 * @return true when the checker is in a scope that indicates the limit can be enforced. Limits
498 * can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a
499 * lesser depth than the limit)
500 */
501 boolean canEnforceLimitFromScope(LimitScope checkerScope) {
502 return checkerScope != null && checkerScope.depth() <= depth;
503 }
504 }
505
506 /**
507 * The different fields that can be used as limits in calls to
508 * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}
509 */
510 private static class LimitFields {
511 /**
512 * Default values of the limit fields. Defined such that if a field does NOT change from its
513 * default, it will not be enforced
514 */
515 private static int DEFAULT_BATCH = -1;
516 private static long DEFAULT_SIZE = -1L;
517 private static long DEFAULT_TIME = -1L;
518
519 /**
520 * Default scope that is assigned to a limit if a scope is not specified.
521 */
522 private static final LimitScope DEFAULT_SCOPE = LimitScope.BETWEEN_ROWS;
523
524 // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
525 // batch scope
526 int batch = DEFAULT_BATCH;
527
528 LimitScope sizeScope = DEFAULT_SCOPE;
529 long size = DEFAULT_SIZE;
530
531 LimitScope timeScope = DEFAULT_SCOPE;
532 long time = DEFAULT_TIME;
533
534 /**
535 * Fields keep their default values.
536 */
537 LimitFields() {
538 }
539
540 LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
541 setFields(batch, sizeScope, size, timeScope, time);
542 }
543
544 void copy(LimitFields limitsToCopy) {
545 if (limitsToCopy != null) {
546 setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
547 limitsToCopy.getTimeScope(), limitsToCopy.getTime());
548 }
549 }
550
551 /**
552 * Set all fields together.
553 * @param batch
554 * @param sizeScope
555 * @param size
556 */
557 void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
558 setBatch(batch);
559 setSizeScope(sizeScope);
560 setSize(size);
561 setTimeScope(timeScope);
562 setTime(time);
563 }
564
565 int getBatch() {
566 return this.batch;
567 }
568
569 void setBatch(int batch) {
570 this.batch = batch;
571 }
572
573 /**
574 * @param checkerScope
575 * @return true when the limit can be enforced from the scope of the checker
576 */
577 boolean canEnforceBatchLimitFromScope(LimitScope checkerScope) {
578 return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope);
579 }
580
581 long getSize() {
582 return this.size;
583 }
584
585 void setSize(long size) {
586 this.size = size;
587 }
588
589 /**
590 * @return {@link LimitScope} indicating scope in which the size limit is enforced
591 */
592 LimitScope getSizeScope() {
593 return this.sizeScope;
594 }
595
596 /**
597 * Change the scope in which the size limit is enforced
598 */
599 void setSizeScope(LimitScope scope) {
600 this.sizeScope = scope;
601 }
602
603 /**
604 * @param checkerScope
605 * @return true when the limit can be enforced from the scope of the checker
606 */
607 boolean canEnforceSizeLimitFromScope(LimitScope checkerScope) {
608 return this.sizeScope.canEnforceLimitFromScope(checkerScope);
609 }
610
611 long getTime() {
612 return this.time;
613 }
614
615 void setTime(long time) {
616 this.time = time;
617 }
618
619 /**
620 * @return {@link LimitScope} indicating scope in which the time limit is enforced
621 */
622 LimitScope getTimeScope() {
623 return this.timeScope;
624 }
625
626 /**
627 * Change the scope in which the time limit is enforced
628 */
629 void setTimeScope(LimitScope scope) {
630 this.timeScope = scope;
631 }
632
633 /**
634 * @param checkerScope
635 * @return true when the limit can be enforced from the scope of the checker
636 */
637 boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
638 return this.timeScope.canEnforceLimitFromScope(checkerScope);
639 }
640
641 @Override
642 public String toString() {
643 StringBuilder sb = new StringBuilder();
644 sb.append("{");
645
646 sb.append("batch:");
647 sb.append(batch);
648
649 sb.append(", size:");
650 sb.append(size);
651
652 sb.append(", sizeScope:");
653 sb.append(sizeScope);
654
655 sb.append(", time:");
656 sb.append(time);
657
658 sb.append(", timeScope:");
659 sb.append(timeScope);
660
661 sb.append("}");
662 return sb.toString();
663 }
664 }
665 }