View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.NotServingRegionException;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.UnknownScannerException;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
36  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
39  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
40  import org.apache.hadoop.hbase.util.Bytes;
41  
42  import java.io.IOException;
43  import java.io.InterruptedIOException;
44  import java.util.ArrayList;
45  import java.util.Arrays;
46  import java.util.LinkedList;
47  import java.util.List;
48  import java.util.Queue;
49  import java.util.concurrent.ExecutorService;
50  
51  /**
52   * Implements the scanner interface for the HBase client.
53   * If there are multiple regions in a table, this scanner will iterate
54   * through them all.
55   */
56  @InterfaceAudience.Private
57  public abstract class ClientScanner extends AbstractClientScanner {
58      private static final Log LOG = LogFactory.getLog(ClientScanner.class);
59      // A byte array in which all elements are the max byte, and it is used to
60      // construct closest front row
61      static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
62      protected Scan scan;
63      protected boolean closed = false;
64      // Current region scanner is against.  Gets cleared if current region goes
65      // wonky: e.g. if it splits on us.
66      protected HRegionInfo currentRegion = null;
67      protected ScannerCallableWithReplicas callable = null;
68      protected Queue<Result> cache;
69      /**
70       * A list of partial results that have been returned from the server. This list should only
71       * contain results if this scanner does not have enough partial results to form the complete
72       * result.
73       */
74      protected final LinkedList<Result> partialResults = new LinkedList<Result>();
75      /**
76       * The row for which we are accumulating partial Results (i.e. the row of the Results stored
77       * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
78       * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
79       */
80      protected byte[] partialResultsRow = null;
81      /**
82       * The last cell from a not full Row which is added to cache
83       */
84      protected Cell lastCellLoadedToCache = null;
85      protected final int caching;
86      protected long lastNext;
87      // Keep lastResult returned successfully in case we have to reset scanner.
88      protected Result lastResult = null;
89      protected final long maxScannerResultSize;
90      private final ClusterConnection connection;
91      private final TableName tableName;
92      protected final int scannerTimeout;
93      protected boolean scanMetricsPublished = false;
94      protected RpcRetryingCaller<Result []> caller;
95      protected RpcControllerFactory rpcControllerFactory;
96      protected Configuration conf;
97      //The timeout on the primary. Applicable if there are multiple replicas for a region
98      //In that case, we will only wait for this much timeout on the primary before going
99      //to the replicas and trying the same scan. Note that the retries will still happen
100     //on each replica and the first successful results will be taken. A timeout of 0 is
101     //disallowed.
102     protected final int primaryOperationTimeout;
103     private int retries;
104     protected final ExecutorService pool;
105 
106   /**
107    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
108    * row maybe changed changed.
109    * @param conf The {@link Configuration} to use.
110    * @param scan {@link Scan} to use in this scanner
111    * @param tableName The table that we wish to scan
112    * @param connection Connection identifying the cluster
113    * @throws IOException
114    */
115   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
116       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
117       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
118       throws IOException {
119       if (LOG.isTraceEnabled()) {
120         LOG.trace("Scan table=" + tableName
121             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
122       }
123       this.scan = scan;
124       this.tableName = tableName;
125       this.lastNext = System.currentTimeMillis();
126       this.connection = connection;
127       this.pool = pool;
128       this.primaryOperationTimeout = primaryOperationTimeout;
129       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
130           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
131       if (scan.getMaxResultSize() > 0) {
132         this.maxScannerResultSize = scan.getMaxResultSize();
133       } else {
134         this.maxScannerResultSize = conf.getLong(
135           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
136           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
137       }
138       this.scannerTimeout = HBaseConfiguration.getInt(conf,
139         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
140         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
141         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
142 
143       // check if application wants to collect scan metrics
144       initScanMetrics(scan);
145 
146       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
147       if (this.scan.getCaching() > 0) {
148         this.caching = this.scan.getCaching();
149       } else {
150         this.caching = conf.getInt(
151             HConstants.HBASE_CLIENT_SCANNER_CACHING,
152             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
153       }
154 
155       this.caller = rpcFactory.<Result[]> newCaller();
156       this.rpcControllerFactory = controllerFactory;
157 
158       this.conf = conf;
159       initCache();
160       initializeScannerInConstruction();
161     }
162 
163     protected abstract void initCache();
164 
165     protected void initializeScannerInConstruction() throws IOException{
166       // initialize the scanner
167       nextScanner(this.caching, false);
168     }
169 
170     protected ClusterConnection getConnection() {
171       return this.connection;
172     }
173 
174     protected TableName getTable() {
175       return this.tableName;
176     }
177 
178     protected int getRetries() {
179       return this.retries;
180     }
181 
182     protected int getScannerTimeout() {
183       return this.scannerTimeout;
184     }
185 
186     protected Configuration getConf() {
187       return this.conf;
188     }
189 
190     protected Scan getScan() {
191       return scan;
192     }
193 
194     protected ExecutorService getPool() {
195       return pool;
196     }
197 
198     protected int getPrimaryOperationTimeout() {
199       return primaryOperationTimeout;
200     }
201 
202     protected int getCaching() {
203       return caching;
204     }
205 
206     protected long getTimestamp() {
207       return lastNext;
208     }
209 
210     @VisibleForTesting
211     protected long getMaxResultSize() {
212       return maxScannerResultSize;
213     }
214 
215     // returns true if the passed region endKey
216     protected boolean checkScanStopRow(final byte [] endKey) {
217       if (this.scan.getStopRow().length > 0) {
218         // there is a stop row, check to see if we are past it.
219         byte [] stopRow = scan.getStopRow();
220         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
221           endKey, 0, endKey.length);
222         if (cmp <= 0) {
223           // stopRow <= endKey (endKey is equals to or larger than stopRow)
224           // This is a stop.
225           return true;
226         }
227       }
228       return false; //unlikely.
229     }
230 
231     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
232       // If we have just switched replica, don't go to the next scanner yet. Rather, try
233       // the scanner operations on the new replica, from the right point in the scan
234       // Note that when we switched to a different replica we left it at a point
235       // where we just did the "openScanner" with the appropriate startrow
236       if (callable != null && callable.switchedToADifferentReplica()) return true;
237       return nextScanner(nbRows, done);
238     }
239 
240     /*
241      * Gets a scanner for the next region.  If this.currentRegion != null, then
242      * we will move to the endrow of this.currentRegion.  Else we will get
243      * scanner at the scan.getStartRow().  We will go no further, just tidy
244      * up outstanding scanners, if <code>currentRegion != null</code> and
245      * <code>done</code> is true.
246      * @param nbRows
247      * @param done Server-side says we're done scanning.
248      */
249   protected boolean nextScanner(int nbRows, final boolean done)
250     throws IOException {
251       // Close the previous scanner if it's open
252       if (this.callable != null) {
253         this.callable.setClose();
254         call(callable, caller, scannerTimeout);
255         this.callable = null;
256       }
257 
258       // Where to start the next scanner
259       byte [] localStartKey;
260 
261       // if we're at end of table, close and return false to stop iterating
262       if (this.currentRegion != null) {
263         byte [] endKey = this.currentRegion.getEndKey();
264         if (endKey == null ||
265             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
266             checkScanStopRow(endKey) ||
267             done) {
268           close();
269           if (LOG.isTraceEnabled()) {
270             LOG.trace("Finished " + this.currentRegion);
271           }
272           return false;
273         }
274         localStartKey = endKey;
275         if (LOG.isTraceEnabled()) {
276           LOG.trace("Finished " + this.currentRegion);
277         }
278       } else {
279         localStartKey = this.scan.getStartRow();
280       }
281 
282       if (LOG.isDebugEnabled() && this.currentRegion != null) {
283         // Only worth logging if NOT first region in scan.
284         LOG.debug("Advancing internal scanner to startKey at '" +
285           Bytes.toStringBinary(localStartKey) + "'");
286       }
287       try {
288         callable = getScannerCallable(localStartKey, nbRows);
289         // Open a scanner on the region server starting at the
290         // beginning of the region
291         call(callable, caller, scannerTimeout);
292         this.currentRegion = callable.getHRegionInfo();
293         if (this.scanMetrics != null) {
294           this.scanMetrics.countOfRegions.incrementAndGet();
295         }
296       } catch (IOException e) {
297         close();
298         throw e;
299       }
300       return true;
301     }
302 
303   @VisibleForTesting
304   boolean isAnyRPCcancelled() {
305     return callable.isAnyRPCcancelled();
306   }
307 
308   Result[] call(ScannerCallableWithReplicas callable,
309       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
310       throws IOException, RuntimeException {
311     if (Thread.interrupted()) {
312       throw new InterruptedIOException();
313     }
314     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
315     // we do a callWithRetries
316     return caller.callWithoutRetries(callable, scannerTimeout);
317   }
318 
319     @InterfaceAudience.Private
320     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
321         int nbRows) {
322       scan.setStartRow(localStartKey);
323       ScannerCallable s =
324           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
325               this.rpcControllerFactory);
326       s.setCaching(nbRows);
327       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
328        s, pool, primaryOperationTimeout, scan,
329        retries, scannerTimeout, caching, conf, caller);
330       return sr;
331     }
332 
333     /**
334      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
335      * application or TableInputFormat.Later, we could push it to other systems. We don't use
336      * metrics framework because it doesn't support multi-instances of the same metrics on the same
337      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
338      *
339      * By default, scan metrics are disabled; if the application wants to collect them, this
340      * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
341      *
342      * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
343      */
344     protected void writeScanMetrics() {
345       if (this.scanMetrics == null || scanMetricsPublished) {
346         return;
347       }
348       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
349       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
350       scanMetricsPublished = true;
351     }
352 
353     protected void initSyncCache() {
354     cache = new LinkedList<Result>();
355   }
356 
357     protected Result nextWithSyncCache() throws IOException {
358       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
359       if (cache.size() == 0 && this.closed) {
360         return null;
361       }
362       if (cache.size() == 0) {
363         loadCache();
364       }
365 
366       if (cache.size() > 0) {
367         return cache.poll();
368       }
369 
370       // if we exhausted this scanner before calling close, write out the scan metrics
371       writeScanMetrics();
372       return null;
373     }
374 
375   @VisibleForTesting
376   public int getCacheSize() {
377     return cache != null ? cache.size() : 0;
378   }
379 
380   /**
381    * Contact the servers to load more {@link Result}s in the cache.
382    */
383   protected void loadCache() throws IOException {
384     // check if scanner was closed during previous prefetch
385     if (closed) return;
386     Result[] values = null;
387     long remainingResultSize = maxScannerResultSize;
388     int countdown = this.caching;
389     // We need to reset it if it's a new callable that was created with a countdown in nextScanner
390     callable.setCaching(this.caching);
391     // This flag is set when we want to skip the result returned. We do
392     // this when we reset scanner because it split under us.
393     boolean retryAfterOutOfOrderException = true;
394     // We don't expect that the server will have more results for us if
395     // it doesn't tell us otherwise. We rely on the size or count of results
396     boolean serverHasMoreResults = false;
397     boolean allResultsSkipped = false;
398     do {
399       allResultsSkipped = false;
400       try {
401         // Server returns a null values if scanning is to stop. Else,
402         // returns an empty array if scanning is to go on and we've just
403         // exhausted current region.
404         values = call(callable, caller, scannerTimeout);
405         // When the replica switch happens, we need to do certain operations again.
406         // The callable will openScanner with the right startkey but we need to pick up
407         // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
408         // of the loop as it happens for the cases where we see exceptions.
409         // Since only openScanner would have happened, values would be null
410         if (values == null && callable.switchedToADifferentReplica()) {
411           // Any accumulated partial results are no longer valid since the callable will
412           // openScanner with the correct startkey and we must pick up from there
413           clearPartialResults();
414           this.currentRegion = callable.getHRegionInfo();
415           continue;
416         }
417         retryAfterOutOfOrderException = true;
418       } catch (DoNotRetryIOException e) {
419         // An exception was thrown which makes any partial results that we were collecting
420         // invalid. The scanner will need to be reset to the beginning of a row.
421         clearPartialResults();
422         // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
423         // to reset the scanner and come back in again.
424         if (e instanceof UnknownScannerException) {
425           long timeout = lastNext + scannerTimeout;
426           // If we are over the timeout, throw this exception to the client wrapped in
427           // a ScannerTimeoutException. Else, it's because the region moved and we used the old
428           // id against the new region server; reset the scanner.
429           if (timeout < System.currentTimeMillis()) {
430             LOG.info("For hints related to the following exception, please try taking a look at: "
431                     + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
432             long elapsed = System.currentTimeMillis() - lastNext;
433             ScannerTimeoutException ex =
434                 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
435                     + "timeout is currently set to " + scannerTimeout);
436             ex.initCause(e);
437             throw ex;
438           }
439         } else {
440           // If exception is any but the list below throw it back to the client; else setup
441           // the scanner and retry.
442           Throwable cause = e.getCause();
443           if ((cause != null && cause instanceof NotServingRegionException) ||
444               (cause != null && cause instanceof RegionServerStoppedException) ||
445               e instanceof OutOfOrderScannerNextException) {
446             // Pass. It is easier writing the if loop test as list of what is allowed rather than
447             // as a list of what is not allowed... so if in here, it means we do not throw.
448           } else {
449             throw e;
450           }
451         }
452         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
453         if (this.lastResult != null) {
454           // The region has moved. We need to open a brand new scanner at the new location.
455           // Reset the startRow to the row we've seen last so that the new scanner starts at
456           // the correct row. Otherwise we may see previously returned rows again.
457           // (ScannerCallable by now has "relocated" the correct region)
458           if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
459             if (scan.isReversed()) {
460               scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
461             } else {
462               scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
463             }
464           } else {
465             // we need rescan this row because we only loaded partial row before
466             scan.setStartRow(lastResult.getRow());
467           }
468         }
469         if (e instanceof OutOfOrderScannerNextException) {
470           if (retryAfterOutOfOrderException) {
471             retryAfterOutOfOrderException = false;
472           } else {
473             // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
474             throw new DoNotRetryIOException("Failed after retry of " +
475                 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
476           }
477         }
478         // Clear region.
479         this.currentRegion = null;
480         // Set this to zero so we don't try and do an rpc and close on remote server when
481         // the exception we got was UnknownScanner or the Server is going down.
482         callable = null;
483         // This continue will take us to while at end of loop where we will set up new scanner.
484         continue;
485       }
486       long currentTime = System.currentTimeMillis();
487       if (this.scanMetrics != null) {
488         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
489       }
490       lastNext = currentTime;
491       // Groom the array of Results that we received back from the server before adding that
492       // Results to the scanner's cache. If partial results are not allowed to be seen by the
493       // caller, all book keeping will be performed within this method.
494       List<Result> resultsToAddToCache =
495           getResultsToAddToCache(values, callable.isHeartbeatMessage());
496       if (!resultsToAddToCache.isEmpty()) {
497         for (Result rs : resultsToAddToCache) {
498           rs = filterLoadedCell(rs);
499           if (rs == null) {
500             continue;
501           }
502 
503           cache.add(rs);
504           long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
505           countdown--;
506           remainingResultSize -= estimatedHeapSizeOfResult;
507           addEstimatedSize(estimatedHeapSizeOfResult);
508           this.lastResult = rs;
509           if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
510             updateLastCellLoadedToCache(this.lastResult);
511           } else {
512             this.lastCellLoadedToCache = null;
513           }
514         }
515         if (cache.isEmpty()) {
516           // all result has been seen before, we need scan more.
517           allResultsSkipped = true;
518           continue;
519         }
520       }
521       if (callable.isHeartbeatMessage()) {
522         if (cache.size() > 0) {
523           // Caller of this method just wants a Result. If we see a heartbeat message, it means
524           // processing of the scan is taking a long time server side. Rather than continue to
525           // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
526           // unnecesary delays to the caller
527           if (LOG.isTraceEnabled()) {
528             LOG.trace("Heartbeat message received and cache contains Results."
529                     + " Breaking out of scan loop");
530           }
531           break;
532         }
533         continue;
534       }
535 
536       // We expect that the server won't have more results for us when we exhaust
537       // the size (bytes or count) of the results returned. If the server *does* inform us that
538       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
539       // get results is the moreResults context valid.
540       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
541         // Only adhere to more server results when we don't have any partialResults
542         // as it keeps the outer loop logic the same.
543         serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
544       }
545       // Values == null means server-side filter has determined we must STOP
546       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
547       // row. We should not change scanners before we receive all the partial Results for that
548       // row.
549     } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
550         || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
551         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
552   }
553 
554   /**
555    * @param remainingResultSize
556    * @param remainingRows
557    * @param regionHasMoreResults
558    * @return true when the current region has been exhausted. When the current region has been
559    *         exhausted, the region must be changed before scanning can continue
560    */
561   private boolean doneWithRegion(long remainingResultSize, int remainingRows,
562       boolean regionHasMoreResults) {
563     return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
564   }
565 
566   protected long calcEstimatedSize(Result rs) {
567     long estimatedHeapSizeOfResult = 0;
568     // We don't make Iterator here
569     for (Cell cell : rs.rawCells()) {
570       estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
571     }
572     return estimatedHeapSizeOfResult;
573   }
574 
575   protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
576     return;
577   }
578 
579   @VisibleForTesting
580   public int getCacheCount() {
581     return cache != null ? cache.size() : 0;
582   }
583 
584   /**
585    * This method ensures all of our book keeping regarding partial results is kept up to date. This
586    * method should be called once we know that the results we received back from the RPC request do
587    * not contain errors. We return a list of results that should be added to the cache. In general,
588    * this list will contain all NON-partial results from the input array (unless the client has
589    * specified that they are okay with receiving partial results)
590    * @param resultsFromServer The array of {@link Result}s returned from the server
591    * @param heartbeatMessage Flag indicating whether or not the response received from the server
592    *          represented a complete response, or a heartbeat message that was sent to keep the
593    *          client-server connection alive
594    * @return the list of results that should be added to the cache.
595    * @throws IOException
596    */
597   protected List<Result>
598       getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
599           throws IOException {
600     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
601     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
602 
603     final boolean isBatchSet = scan != null && scan.getBatch() > 0;
604     final boolean allowPartials = scan != null && scan.getAllowPartialResults();
605 
606     // If the caller has indicated in their scan that they are okay with seeing partial results,
607     // then simply add all results to the list. Note that since scan batching also returns results
608     // for a row in pieces we treat batch being set as equivalent to allowing partials. The
609     // implication of treating batching as equivalent to partial results is that it is possible
610     // the caller will receive a result back where the number of cells in the result is less than
611     // the batch size even though it may not be the last group of cells for that row.
612     if (allowPartials || isBatchSet) {
613       addResultsToList(resultsToAddToCache, resultsFromServer, 0,
614           (null == resultsFromServer ? 0 : resultsFromServer.length));
615       return resultsToAddToCache;
616     }
617 
618     // If no results were returned it indicates that either we have the all the partial results
619     // necessary to construct the complete result or the server had to send a heartbeat message
620     // to the client to keep the client-server connection alive
621     if (resultsFromServer == null || resultsFromServer.length == 0) {
622       // If this response was an empty heartbeat message, then we have not exhausted the region
623       // and thus there may be more partials server side that still need to be added to the partial
624       // list before we form the complete Result
625       if (!partialResults.isEmpty() && !heartbeatMessage) {
626         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
627         clearPartialResults();
628       }
629 
630       return resultsToAddToCache;
631     }
632 
633     // In every RPC response there should be at most a single partial result. Furthermore, if
634     // there is a partial result, it is guaranteed to be in the last position of the array.
635     Result last = resultsFromServer[resultsFromServer.length - 1];
636     Result partial = last.isPartial() ? last : null;
637 
638     if (LOG.isTraceEnabled()) {
639       StringBuilder sb = new StringBuilder();
640       sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
641       sb.append("partial != null: ").append(partial != null).append(",");
642       sb.append("number of partials so far: ").append(partialResults.size());
643       LOG.trace(sb.toString());
644     }
645 
646     // There are three possibilities cases that can occur while handling partial results
647     //
648     // 1. (partial != null && partialResults.isEmpty())
649     // This is the first partial result that we have received. It should be added to
650     // the list of partialResults and await the next RPC request at which point another
651     // portion of the complete result will be received
652     //
653     // 2. !partialResults.isEmpty()
654     // Since our partialResults list is not empty it means that we have been accumulating partial
655     // Results for a particular row. We cannot form the complete/whole Result for that row until
656     // all partials for the row have been received. Thus we loop through all of the Results
657     // returned from the server and determine whether or not all partial Results for the row have
658     // been received. We know that we have received all of the partial Results for the row when:
659     // i) We notice a row change in the Results
660     // ii) We see a Result for the partial row that is NOT marked as a partial Result
661     //
662     // 3. (partial == null && partialResults.isEmpty())
663     // Business as usual. We are not accumulating partial results and there wasn't a partial result
664     // in the RPC response. This means that all of the results we received from the server are
665     // complete and can be added directly to the cache
666     if (partial != null && partialResults.isEmpty()) {
667       addToPartialResults(partial);
668 
669       // Exclude the last result, it's a partial
670       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
671     } else if (!partialResults.isEmpty()) {
672       for (int i = 0; i < resultsFromServer.length; i++) {
673         Result result = resultsFromServer[i];
674 
675         // This result is from the same row as the partial Results. Add it to the list of partials
676         // and check if it was the last partial Result for that row
677         if (Bytes.equals(partialResultsRow, result.getRow())) {
678           addToPartialResults(result);
679 
680           // If the result is not a partial, it is a signal to us that it is the last Result we
681           // need to form the complete Result client-side
682           if (!result.isPartial()) {
683             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
684             clearPartialResults();
685           }
686         } else {
687           // The row of this result differs from the row of the partial results we have received so
688           // far. If our list of partials isn't empty, this is a signal to form the complete Result
689           // since the row has now changed
690           if (!partialResults.isEmpty()) {
691             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
692             clearPartialResults();
693           }
694 
695           // It's possible that in one response from the server we receive the final partial for
696           // one row and receive a partial for a different row. Thus, make sure that all Results
697           // are added to the proper list
698           if (result.isPartial()) {
699             addToPartialResults(result);
700           } else {
701             resultsToAddToCache.add(result);
702           }
703         }
704       }
705     } else { // partial == null && partialResults.isEmpty() -- business as usual
706       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
707     }
708 
709     return resultsToAddToCache;
710   }
711 
712   /**
713    * A convenience method for adding a Result to our list of partials. This method ensure that only
714    * Results that belong to the same row as the other partials can be added to the list.
715    * @param result The result that we want to add to our list of partial Results
716    * @throws IOException
717    */
718   private void addToPartialResults(final Result result) throws IOException {
719     final byte[] row = result.getRow();
720     if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
721       throw new IOException("Partial result row does not match. All partial results must come "
722           + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
723           + Bytes.toString(row));
724     }
725     partialResultsRow = row;
726     partialResults.add(result);
727   }
728 
729   /**
730    * Convenience method for clearing the list of partials and resetting the partialResultsRow.
731    */
732   private void clearPartialResults() {
733     partialResults.clear();
734     partialResultsRow = null;
735   }
736 
737   /**
738    * Helper method for adding results between the indices [start, end) to the outputList
739    * @param outputList the list that results will be added to
740    * @param inputArray the array that results are taken from
741    * @param start beginning index (inclusive)
742    * @param end ending index (exclusive)
743    */
744   private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
745     if (inputArray == null || start < 0 || end > inputArray.length) return;
746 
747     for (int i = start; i < end; i++) {
748       outputList.add(inputArray[i]);
749     }
750   }
751 
752     @Override
753     public void close() {
754       if (!scanMetricsPublished) writeScanMetrics();
755       if (callable != null) {
756         callable.setClose();
757         try {
758           call(callable, caller, scannerTimeout);
759         } catch (UnknownScannerException e) {
760            // We used to catch this error, interpret, and rethrow. However, we
761            // have since decided that it's not nice for a scanner's close to
762            // throw exceptions. Chances are it was just due to lease time out.
763           if (LOG.isDebugEnabled()) {
764             LOG.debug("scanner failed to close", e);
765           }
766         } catch (IOException e) {
767           /* An exception other than UnknownScanner is unexpected. */
768           LOG.warn("scanner failed to close.", e);
769         }
770         callable = null;
771       }
772       closed = true;
773     }
774 
775   /**
776    * Create the closest row before the specified row
777    * @param row
778    * @return a new byte array which is the closest front row of the specified one
779    */
780   protected static byte[] createClosestRowBefore(byte[] row) {
781     if (row == null) {
782       throw new IllegalArgumentException("The passed row is empty");
783     }
784     if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
785       return MAX_BYTE_ARRAY;
786     }
787     if (row[row.length - 1] == 0) {
788       return Arrays.copyOf(row, row.length - 1);
789     } else {
790       byte[] closestFrontRow = Arrays.copyOf(row, row.length);
791       closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
792       closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
793       return closestFrontRow;
794     }
795   }
796 
797   @Override
798   public boolean renewLease() {
799     if (callable != null) {
800       // do not return any rows, do not advance the scanner
801       callable.setRenew(true);
802       try {
803         this.caller.callWithoutRetries(callable, this.scannerTimeout);
804       } catch (Exception e) {
805         return false;
806       } finally {
807         callable.setRenew(false);
808       }
809       return true;
810     }
811     return false;
812   }
813 
814   protected void updateLastCellLoadedToCache(Result result) {
815     if (result.rawCells().length == 0) {
816       return;
817     }
818     this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
819   }
820 
821   /**
822    * Compare two Cells considering reversed scanner.
823    * ReversedScanner only reverses rows, not columns.
824    */
825   private int compare(Cell a, Cell b) {
826     CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ?
827         CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
828     int r = comparator.compareRows(a, b);
829     if (r != 0) {
830       return this.scan.isReversed() ? -r : r;
831     }
832     return CellComparator.compareWithoutRow(a, b);
833   }
834 
835   private Result filterLoadedCell(Result result) {
836     // we only filter result when last result is partial
837     // so lastCellLoadedToCache and result should have same row key.
838     // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
839     // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
840     if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
841       return result;
842     }
843     if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
844       // The first cell of this result is larger than the last cell of loadcache.
845       // If user do not allow partial result, it must be true.
846       return result;
847     }
848     if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
849       // The last cell of this result is smaller than the last cell of loadcache, skip all.
850       return null;
851     }
852 
853     // The first one must not in filtered result, we start at the second.
854     int index = 1;
855     while (index < result.rawCells().length) {
856       if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
857         break;
858       }
859       index++;
860     }
861     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
862     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
863   }
864 }