View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.util.List;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
28  
29  /**
30   * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
31   * invocations of {@link InternalScanner#next(java.util.List)} and
32   * {@link RegionScanner#next(java.util.List)}.
33   * <p>
34   * A ScannerContext instance should be updated periodically throughout execution whenever progress
35   * towards a limit has been made. Each limit can be checked via the appropriate checkLimit method.
36   * <p>
37   * Once a limit has been reached, the scan will stop. The invoker of
38   * {@link InternalScanner#next(java.util.List)} or {@link RegionScanner#next(java.util.List)} can
39   * use the appropriate check*Limit methods to see exactly which limits have been reached.
40   * Alternatively, {@link #checkAnyLimitReached(LimitScope)} is provided to see if ANY limit was
41   * reached
42   * <p>
43   * {@link NoLimitScannerContext#NO_LIMIT} is an immutable static definition that can be used
44   * whenever a {@link ScannerContext} is needed but limits do not need to be enforced.
45   * <p>
46   * NOTE: It is important that this class only ever expose setter methods that can be safely skipped
47   * when limits should be NOT enforced. This is because of the necessary immutability of the class
48   * {@link NoLimitScannerContext}. If a setter cannot be safely skipped, the immutable nature of
49   * {@link NoLimitScannerContext} will lead to incorrect behavior.
50   */
51  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
52  @InterfaceStability.Evolving
53  public class ScannerContext {
54    private static final Log LOG = LogFactory.getLog(ScannerContext.class);
55  
56    /**
57     * Two sets of the same fields. One for the limits, another for the progress towards those limits
58     */
59    LimitFields limits;
60    LimitFields progress;
61  
62    /**
63     * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
64     * or {@link RegionScanner#next(java.util.List)}.
65     */
66    NextState scannerState;
67    private static final NextState DEFAULT_STATE = NextState.MORE_VALUES;
68  
69    /**
70     * Used as an indication to invocations of {@link InternalScanner#next(java.util.List)} and
71     * {@link RegionScanner#next(java.util.List)} that, if true, the progress tracked within this
72     * {@link ScannerContext} instance should be considered while evaluating the limits. Useful for
73     * enforcing a set of limits across multiple calls (i.e. the limit may not be reached in a single
74     * invocation, but any progress made should be considered in future invocations)
75     * <p>
76     * Defaulting this value to false means that, by default, any tracked progress will be wiped clean
77     * on invocations to {@link InternalScanner#next(java.util.List)} and
78     * {@link RegionScanner#next(java.util.List)} and the call will be treated as though no progress
79     * has been made towards the limits so far.
80     * <p>
81     * This is an important mechanism. Users of Internal/Region scanners expect that they can define
82     * some limits and then repeatedly invoke {@link InternalScanner#next(List)} or
83     * {@link RegionScanner#next(List)} where each invocation respects these limits separately.
84     * <p>
85     * For example: <code><pre>
86     * ScannerContext context = new ScannerContext.newBuilder().setBatchLimit(5).build();
87     * RegionScanner scanner = ...
88     * List<Cell> results = new ArrayList<Cell>();
89     * while(scanner.next(results, context)) {
90     *   // Do something with a batch of 5 cells
91     * }
92     * </pre></code> However, in the case of RPCs, the server wants to be able to define a set of
93     * limits for a particular RPC request and have those limits respected across multiple
94     * invocations. This means that the progress made towards the limits in earlier calls will be
95     * saved and considered in future invocations
96     */
97    boolean keepProgress;
98    private static boolean DEFAULT_KEEP_PROGRESS = false;
99  
100   /**
101    * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
102    */
103   final ServerSideScanMetrics metrics;
104 
105   ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
106     this.limits = new LimitFields();
107     if (limitsToCopy != null) this.limits.copy(limitsToCopy);
108 
109     // Progress fields are initialized to 0
110     progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
111 
112     this.keepProgress = keepProgress;
113     this.scannerState = DEFAULT_STATE;
114     this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
115   }
116 
117   boolean isTrackingMetrics() {
118     return this.metrics != null;
119   }
120 
121   /**
122    * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
123    * has been made to confirm that metrics are indeed being tracked.
124    * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
125    */
126   ServerSideScanMetrics getMetrics() {
127     assert isTrackingMetrics();
128     return this.metrics;
129   }
130 
131   /**
132    * @return true if the progress tracked so far in this instance will be considered during an
133    *         invocation of {@link InternalScanner#next(java.util.List)} or
134    *         {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far
135    *         should not be considered and should instead be wiped away via {@link #clearProgress()}
136    */
137   boolean getKeepProgress() {
138     return keepProgress;
139   }
140 
141   void setKeepProgress(boolean keepProgress) {
142     this.keepProgress = keepProgress;
143   }
144 
145   /**
146    * Progress towards the batch limit has been made. Increment internal tracking of batch progress
147    */
148   void incrementBatchProgress(int batch) {
149     int currentBatch = progress.getBatch();
150     progress.setBatch(currentBatch + batch);
151   }
152 
153   /**
154    * Progress towards the size limit has been made. Increment internal tracking of size progress
155    */
156   void incrementSizeProgress(long size) {
157     long currentSize = progress.getSize();
158     progress.setSize(currentSize + size);
159   }
160 
161   /**
162    * Update the time progress with {@link System#currentTimeMillis()}
163    */
164   void updateTimeProgress() {
165     progress.setTime(System.currentTimeMillis());
166   }
167 
168   int getBatchProgress() {
169     return progress.getBatch();
170   }
171 
172   long getSizeProgress() {
173     return progress.getSize();
174   }
175 
176   long getTimeProgress() {
177     return progress.getTime();
178   }
179 
180   void setProgress(int batchProgress, long sizeProgress, long timeProgress) {
181     setBatchProgress(batchProgress);
182     setSizeProgress(sizeProgress);
183     setTimeProgress(timeProgress);
184   }
185 
186   void setSizeProgress(long sizeProgress) {
187     progress.setSize(sizeProgress);
188   }
189 
190   void setBatchProgress(int batchProgress) {
191     progress.setBatch(batchProgress);
192   }
193 
194   void setTimeProgress(long timeProgress) {
195     progress.setTime(timeProgress);
196   }
197 
198   /**
199    * Clear away any progress that has been made so far. All progress fields are reset to initial
200    * values
201    */
202   void clearProgress() {
203     progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, LimitFields.DEFAULT_SCOPE, 0);
204   }
205 
206   /**
207    * Note that this is not a typical setter. This setter returns the {@link NextState} that was
208    * passed in so that methods can be invoked against the new state. Furthermore, this pattern
209    * allows the {@link NoLimitScannerContext} to cleanly override this setter and simply return the
210    * new state, thus preserving the immutability of {@link NoLimitScannerContext}
211    * @param state
212    * @return The state that was passed in.
213    */
214   NextState setScannerState(NextState state) {
215     if (!NextState.isValidState(state)) {
216       throw new IllegalArgumentException("Cannot set to invalid state: " + state);
217     }
218 
219     this.scannerState = state;
220     return state;
221   }
222 
223   /**
224    * @return true when a partial result is formed. A partial result is formed when a limit is
225    *         reached in the middle of a row.
226    */
227   boolean partialResultFormed() {
228     return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
229         || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW;
230   }
231 
232   /**
233    * @return true when a mid-row result is formed.
234    */
235   boolean midRowResultFormed() {
236     return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW
237         || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW
238         || scannerState == NextState.BATCH_LIMIT_REACHED;
239   }
240 
241   /**
242    * @param checkerScope
243    * @return true if the batch limit can be enforced in the checker's scope
244    */
245   boolean hasBatchLimit(LimitScope checkerScope) {
246     return limits.canEnforceBatchLimitFromScope(checkerScope) && limits.getBatch() > 0;
247   }
248 
249   /**
250    * @param checkerScope
251    * @return true if the size limit can be enforced in the checker's scope
252    */
253   boolean hasSizeLimit(LimitScope checkerScope) {
254     return limits.canEnforceSizeLimitFromScope(checkerScope) && limits.getSize() > 0;
255   }
256 
257   /**
258    * @param checkerScope
259    * @return true if the time limit can be enforced in the checker's scope
260    */
261   boolean hasTimeLimit(LimitScope checkerScope) {
262     return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
263   }
264 
265   /**
266    * @param checkerScope
267    * @return true if any limit can be enforced within the checker's scope
268    */
269   boolean hasAnyLimit(LimitScope checkerScope) {
270     return hasBatchLimit(checkerScope) || hasSizeLimit(checkerScope) || hasTimeLimit(checkerScope);
271   }
272 
273   /**
274    * @param scope The scope in which the size limit will be enforced
275    */
276   void setSizeLimitScope(LimitScope scope) {
277     limits.setSizeScope(scope);
278   }
279 
280   /**
281    * @param scope The scope in which the time limit will be enforced
282    */
283   void setTimeLimitScope(LimitScope scope) {
284     limits.setTimeScope(scope);
285   }
286 
287   int getBatchLimit() {
288     return limits.getBatch();
289   }
290 
291   long getSizeLimit() {
292     return limits.getSize();
293   }
294 
295   long getTimeLimit() {
296     return limits.getTime();
297   }
298 
299   /**
300    * @param checkerScope The scope that the limit is being checked from
301    * @return true when the limit is enforceable from the checker's scope and it has been reached
302    */
303   boolean checkBatchLimit(LimitScope checkerScope) {
304     return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch();
305   }
306 
307   /**
308    * @param checkerScope The scope that the limit is being checked from
309    * @return true when the limit is enforceable from the checker's scope and it has been reached
310    */
311   boolean checkSizeLimit(LimitScope checkerScope) {
312     return hasSizeLimit(checkerScope) && progress.getSize() >= limits.getSize();
313   }
314 
315   /**
316    * @param checkerScope The scope that the limit is being checked from. The time limit is always
317    *          checked against {@link System#currentTimeMillis()}
318    * @return true when the limit is enforceable from the checker's scope and it has been reached
319    */
320   boolean checkTimeLimit(LimitScope checkerScope) {
321     return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
322   }
323 
324   /**
325    * @param checkerScope The scope that the limits are being checked from
326    * @return true when some limit is enforceable from the checker's scope and it has been reached
327    */
328   boolean checkAnyLimitReached(LimitScope checkerScope) {
329     return checkSizeLimit(checkerScope) || checkBatchLimit(checkerScope)
330         || checkTimeLimit(checkerScope);
331   }
332 
333   @Override
334   public String toString() {
335     StringBuilder sb = new StringBuilder();
336     sb.append("{");
337 
338     sb.append("limits:");
339     sb.append(limits);
340 
341     sb.append(", progress:");
342     sb.append(progress);
343 
344     sb.append(", keepProgress:");
345     sb.append(keepProgress);
346 
347     sb.append(", state:");
348     sb.append(scannerState);
349 
350     sb.append("}");
351     return sb.toString();
352   }
353 
354   public static Builder newBuilder() {
355     return new Builder();
356   }
357 
358   public static Builder newBuilder(boolean keepProgress) {
359     return new Builder(keepProgress);
360   }
361 
362   public static final class Builder {
363     boolean keepProgress = DEFAULT_KEEP_PROGRESS;
364     boolean trackMetrics = false;
365     LimitFields limits = new LimitFields();
366 
367     private Builder() {
368     }
369 
370     private Builder(boolean keepProgress) {
371       this.keepProgress = keepProgress;
372     }
373 
374     public Builder setKeepProgress(boolean keepProgress) {
375       this.keepProgress = keepProgress;
376       return this;
377     }
378 
379     public Builder setTrackMetrics(boolean trackMetrics) {
380       this.trackMetrics = trackMetrics;
381       return this;
382     }
383 
384     public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
385       limits.setSize(sizeLimit);
386       limits.setSizeScope(sizeScope);
387       return this;
388     }
389 
390     public Builder setTimeLimit(LimitScope timeScope, long timeLimit) {
391       limits.setTime(timeLimit);
392       limits.setTimeScope(timeScope);
393       return this;
394     }
395 
396     public Builder setBatchLimit(int batchLimit) {
397       limits.setBatch(batchLimit);
398       return this;
399     }
400 
401     public ScannerContext build() {
402       return new ScannerContext(keepProgress, limits, trackMetrics);
403     }
404   }
405 
406   /**
407    * The possible states a scanner may be in following a call to {@link InternalScanner#next(List)}
408    */
409   public enum NextState {
410     MORE_VALUES(true, false),
411     NO_MORE_VALUES(false, false),
412     SIZE_LIMIT_REACHED(true, true),
413 
414     /**
415      * Special case of size limit reached to indicate that the size limit was reached in the middle
416      * of a row and thus a partial results was formed
417      */
418     SIZE_LIMIT_REACHED_MID_ROW(true, true),
419     TIME_LIMIT_REACHED(true, true),
420 
421     /**
422      * Special case of time limit reached to indicate that the time limit was reached in the middle
423      * of a row and thus a partial results was formed
424      */
425     TIME_LIMIT_REACHED_MID_ROW(true, true),
426     BATCH_LIMIT_REACHED(true, true);
427 
428     private boolean moreValues;
429     private boolean limitReached;
430 
431     private NextState(boolean moreValues, boolean limitReached) {
432       this.moreValues = moreValues;
433       this.limitReached = limitReached;
434     }
435 
436     /**
437      * @return true when the state indicates that more values may follow those that have been
438      *         returned
439      */
440     public boolean hasMoreValues() {
441       return this.moreValues;
442     }
443 
444     /**
445      * @return true when the state indicates that a limit has been reached and scan should stop
446      */
447     public boolean limitReached() {
448       return this.limitReached;
449     }
450 
451     public static boolean isValidState(NextState state) {
452       return state != null;
453     }
454 
455     public static boolean hasMoreValues(NextState state) {
456       return isValidState(state) && state.hasMoreValues();
457     }
458   }
459 
460   /**
461    * The various scopes where a limit can be enforced. Used to differentiate when a limit should be
462    * enforced or not.
463    */
464   public enum LimitScope {
465     /**
466      * Enforcing a limit between rows means that the limit will not be considered until all the
467      * cells for a particular row have been retrieved
468      */
469     BETWEEN_ROWS(0),
470 
471     /**
472      * Enforcing a limit between cells means that the limit will be considered after each full cell
473      * has been retrieved
474      */
475     BETWEEN_CELLS(1);
476 
477     /**
478      * When enforcing a limit, we must check that the scope is appropriate for enforcement.
479      * <p>
480      * To communicate this concept, each scope has a depth. A limit will be enforced if the depth of
481      * the checker's scope is less than or equal to the limit's scope. This means that when checking
482      * limits, the checker must know their own scope (i.e. are they checking the limits between
483      * rows, between cells, etc...)
484      */
485     int depth;
486 
487     LimitScope(int depth) {
488       this.depth = depth;
489     }
490 
491     int depth() {
492       return depth;
493     }
494 
495     /**
496      * @param checkerScope The scope in which the limit is being checked
497      * @return true when the checker is in a scope that indicates the limit can be enforced. Limits
498      *         can be enforced from "higher or equal" scopes (i.e. the checker's scope is at a
499      *         lesser depth than the limit)
500      */
501     boolean canEnforceLimitFromScope(LimitScope checkerScope) {
502       return checkerScope != null && checkerScope.depth() <= depth;
503     }
504   }
505 
506   /**
507    * The different fields that can be used as limits in calls to
508    * {@link InternalScanner#next(java.util.List)} and {@link RegionScanner#next(java.util.List)}
509    */
510   private static class LimitFields {
511     /**
512      * Default values of the limit fields. Defined such that if a field does NOT change from its
513      * default, it will not be enforced
514      */
515     private static int DEFAULT_BATCH = -1;
516     private static long DEFAULT_SIZE = -1L;
517     private static long DEFAULT_TIME = -1L;
518 
519     /**
520      * Default scope that is assigned to a limit if a scope is not specified.
521      */
522     private static final LimitScope DEFAULT_SCOPE = LimitScope.BETWEEN_ROWS;
523 
524     // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
525     // batch scope
526     int batch = DEFAULT_BATCH;
527 
528     LimitScope sizeScope = DEFAULT_SCOPE;
529     long size = DEFAULT_SIZE;
530 
531     LimitScope timeScope = DEFAULT_SCOPE;
532     long time = DEFAULT_TIME;
533 
534     /**
535      * Fields keep their default values.
536      */
537     LimitFields() {
538     }
539 
540     LimitFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
541       setFields(batch, sizeScope, size, timeScope, time);
542     }
543 
544     void copy(LimitFields limitsToCopy) {
545       if (limitsToCopy != null) {
546         setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getSize(),
547           limitsToCopy.getTimeScope(), limitsToCopy.getTime());
548       }
549     }
550 
551     /**
552      * Set all fields together.
553      * @param batch
554      * @param sizeScope
555      * @param size
556      */
557     void setFields(int batch, LimitScope sizeScope, long size, LimitScope timeScope, long time) {
558       setBatch(batch);
559       setSizeScope(sizeScope);
560       setSize(size);
561       setTimeScope(timeScope);
562       setTime(time);
563     }
564 
565     int getBatch() {
566       return this.batch;
567     }
568 
569     void setBatch(int batch) {
570       this.batch = batch;
571     }
572 
573     /**
574      * @param checkerScope
575      * @return true when the limit can be enforced from the scope of the checker
576      */
577     boolean canEnforceBatchLimitFromScope(LimitScope checkerScope) {
578       return LimitScope.BETWEEN_CELLS.canEnforceLimitFromScope(checkerScope);
579     }
580 
581     long getSize() {
582       return this.size;
583     }
584 
585     void setSize(long size) {
586       this.size = size;
587     }
588 
589     /**
590      * @return {@link LimitScope} indicating scope in which the size limit is enforced
591      */
592     LimitScope getSizeScope() {
593       return this.sizeScope;
594     }
595 
596     /**
597      * Change the scope in which the size limit is enforced
598      */
599     void setSizeScope(LimitScope scope) {
600       this.sizeScope = scope;
601     }
602 
603     /**
604      * @param checkerScope
605      * @return true when the limit can be enforced from the scope of the checker
606      */
607     boolean canEnforceSizeLimitFromScope(LimitScope checkerScope) {
608       return this.sizeScope.canEnforceLimitFromScope(checkerScope);
609     }
610 
611     long getTime() {
612       return this.time;
613     }
614 
615     void setTime(long time) {
616       this.time = time;
617     }
618 
619     /**
620      * @return {@link LimitScope} indicating scope in which the time limit is enforced
621      */
622     LimitScope getTimeScope() {
623       return this.timeScope;
624     }
625 
626     /**
627      * Change the scope in which the time limit is enforced
628      */
629     void setTimeScope(LimitScope scope) {
630       this.timeScope = scope;
631     }
632 
633     /**
634      * @param checkerScope
635      * @return true when the limit can be enforced from the scope of the checker
636      */
637     boolean canEnforceTimeLimitFromScope(LimitScope checkerScope) {
638       return this.timeScope.canEnforceLimitFromScope(checkerScope);
639     }
640 
641     @Override
642     public String toString() {
643       StringBuilder sb = new StringBuilder();
644       sb.append("{");
645 
646       sb.append("batch:");
647       sb.append(batch);
648 
649       sb.append(", size:");
650       sb.append(size);
651 
652       sb.append(", sizeScope:");
653       sb.append(sizeScope);
654 
655       sb.append(", time:");
656       sb.append(time);
657 
658       sb.append(", timeScope:");
659       sb.append(timeScope);
660 
661       sb.append("}");
662       return sb.toString();
663     }
664   }
665 }