View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellComparator;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.NotServingRegionException;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.UnknownScannerException;
43  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
48  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
49  import org.apache.hadoop.hbase.util.Bytes;
50  
51  import com.google.common.annotations.VisibleForTesting;
52  
53  /**
54   * Implements the scanner interface for the HBase client.
55   * If there are multiple regions in a table, this scanner will iterate
56   * through them all.
57   */
58  @InterfaceAudience.Private
59  public class ClientScanner extends AbstractClientScanner {
60      private static final Log LOG = LogFactory.getLog(ClientScanner.class);
61      // A byte array in which all elements are the max byte, and it is used to
62      // construct closest front row
63      static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
64      protected Scan scan;
65      protected boolean closed = false;
66      // Current region scanner is against.  Gets cleared if current region goes
67      // wonky: e.g. if it splits on us.
68      protected HRegionInfo currentRegion = null;
69      protected ScannerCallableWithReplicas callable = null;
70      protected final LinkedList<Result> cache = new LinkedList<Result>();
71      /**
72       * A list of partial results that have been returned from the server. This list should only
73       * contain results if this scanner does not have enough partial results to form the complete
74       * result.
75       */
76      protected final LinkedList<Result> partialResults = new LinkedList<Result>();
77      /**
78       * The row for which we are accumulating partial Results (i.e. the row of the Results stored
79       * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
80       * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
81       */
82      protected byte[] partialResultsRow = null;
83      /**
84       * The last cell from a not full Row which is added to cache
85       */
86      protected Cell lastCellLoadedToCache = null;
87      protected final int caching;
88      protected long lastNext;
89      // Keep lastResult returned successfully in case we have to reset scanner.
90      protected Result lastResult = null;
91      protected final long maxScannerResultSize;
92      private final ClusterConnection connection;
93      private final TableName tableName;
94      protected final int scannerTimeout;
95      protected boolean scanMetricsPublished = false;
96      protected RpcRetryingCaller<Result []> caller;
97      protected RpcControllerFactory rpcControllerFactory;
98      protected Configuration conf;
99      //The timeout on the primary. Applicable if there are multiple replicas for a region
100     //In that case, we will only wait for this much timeout on the primary before going
101     //to the replicas and trying the same scan. Note that the retries will still happen
102     //on each replica and the first successful results will be taken. A timeout of 0 is
103     //disallowed.
104     protected final int primaryOperationTimeout;
105     private int retries;
106     protected final ExecutorService pool;
107     private static MetaComparator metaComparator = new MetaComparator();
108 
109   /**
110    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
111    * row maybe changed changed.
112    * @param conf The {@link Configuration} to use.
113    * @param scan {@link Scan} to use in this scanner
114    * @param tableName The table that we wish to scan
115    * @param connection Connection identifying the cluster
116    * @throws IOException
117    */
118   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
119       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
121       throws IOException {
122       if (LOG.isTraceEnabled()) {
123         LOG.trace("Scan table=" + tableName
124             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
125       }
126       this.scan = scan;
127       this.tableName = tableName;
128       this.lastNext = System.currentTimeMillis();
129       this.connection = connection;
130       this.pool = pool;
131       this.primaryOperationTimeout = primaryOperationTimeout;
132       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
133           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
134       if (scan.getMaxResultSize() > 0) {
135         this.maxScannerResultSize = scan.getMaxResultSize();
136       } else {
137         this.maxScannerResultSize = conf.getLong(
138           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
139           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
140       }
141       this.scannerTimeout = HBaseConfiguration.getInt(conf,
142         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
143         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
144         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
145 
146       // check if application wants to collect scan metrics
147       initScanMetrics(scan);
148 
149       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
150       if (this.scan.getCaching() > 0) {
151         this.caching = this.scan.getCaching();
152       } else {
153         this.caching = conf.getInt(
154             HConstants.HBASE_CLIENT_SCANNER_CACHING,
155             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
156       }
157 
158       this.caller = rpcFactory.<Result[]> newCaller();
159       this.rpcControllerFactory = controllerFactory;
160 
161       this.conf = conf;
162       initializeScannerInConstruction();
163     }
164 
165     protected void initializeScannerInConstruction() throws IOException{
166       // initialize the scanner
167       nextScanner(this.caching, false);
168     }
169 
170     protected ClusterConnection getConnection() {
171       return this.connection;
172     }
173 
174     /**
175      * @return Table name
176      * @deprecated As of release 0.96
177      *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
178      *             This will be removed in HBase 2.0.0. Use {@link #getTable()}.
179      */
180     @Deprecated
181     protected byte [] getTableName() {
182       return this.tableName.getName();
183     }
184 
185     protected TableName getTable() {
186       return this.tableName;
187     }
188 
189     protected int getRetries() {
190       return this.retries;
191     }
192 
193     protected int getScannerTimeout() {
194       return this.scannerTimeout;
195     }
196 
197     protected Configuration getConf() {
198       return this.conf;
199     }
200 
201     protected Scan getScan() {
202       return scan;
203     }
204 
205     protected ExecutorService getPool() {
206       return pool;
207     }
208 
209     protected int getPrimaryOperationTimeout() {
210       return primaryOperationTimeout;
211     }
212 
213     protected int getCaching() {
214       return caching;
215     }
216 
217     protected long getTimestamp() {
218       return lastNext;
219     }
220 
221     @VisibleForTesting
222     protected long getMaxResultSize() {
223       return maxScannerResultSize;
224     }
225 
226     // returns true if the passed region endKey
227     protected boolean checkScanStopRow(final byte [] endKey) {
228       if (this.scan.getStopRow().length > 0) {
229         // there is a stop row, check to see if we are past it.
230         byte [] stopRow = scan.getStopRow();
231         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
232           endKey, 0, endKey.length);
233         if (cmp <= 0) {
234           // stopRow <= endKey (endKey is equals to or larger than stopRow)
235           // This is a stop.
236           return true;
237         }
238       }
239       return false; //unlikely.
240     }
241 
242     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
243       // If we have just switched replica, don't go to the next scanner yet. Rather, try
244       // the scanner operations on the new replica, from the right point in the scan
245       // Note that when we switched to a different replica we left it at a point
246       // where we just did the "openScanner" with the appropriate startrow
247       if (callable != null && callable.switchedToADifferentReplica()) return true;
248       return nextScanner(nbRows, done);
249     }
250 
251     /*
252      * Gets a scanner for the next region.  If this.currentRegion != null, then
253      * we will move to the endrow of this.currentRegion.  Else we will get
254      * scanner at the scan.getStartRow().  We will go no further, just tidy
255      * up outstanding scanners, if <code>currentRegion != null</code> and
256      * <code>done</code> is true.
257      * @param nbRows
258      * @param done Server-side says we're done scanning.
259      */
260   protected boolean nextScanner(int nbRows, final boolean done)
261     throws IOException {
262       // Close the previous scanner if it's open
263       if (this.callable != null) {
264         this.callable.setClose();
265         call(callable, caller, scannerTimeout);
266         this.callable = null;
267       }
268 
269       // Where to start the next scanner
270       byte [] localStartKey;
271 
272       // if we're at end of table, close and return false to stop iterating
273       if (this.currentRegion != null) {
274         byte [] endKey = this.currentRegion.getEndKey();
275         if (endKey == null ||
276             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
277             checkScanStopRow(endKey) ||
278             done) {
279           close();
280           if (LOG.isTraceEnabled()) {
281             LOG.trace("Finished " + this.currentRegion);
282           }
283           return false;
284         }
285         localStartKey = endKey;
286         if (LOG.isTraceEnabled()) {
287           LOG.trace("Finished " + this.currentRegion);
288         }
289       } else {
290         localStartKey = this.scan.getStartRow();
291       }
292 
293       if (LOG.isDebugEnabled() && this.currentRegion != null) {
294         // Only worth logging if NOT first region in scan.
295         LOG.debug("Advancing internal scanner to startKey at '" +
296           Bytes.toStringBinary(localStartKey) + "'");
297       }
298       try {
299         callable = getScannerCallable(localStartKey, nbRows);
300         // Open a scanner on the region server starting at the
301         // beginning of the region
302         call(callable, caller, scannerTimeout);
303         this.currentRegion = callable.getHRegionInfo();
304         if (this.scanMetrics != null) {
305           this.scanMetrics.countOfRegions.incrementAndGet();
306         }
307       } catch (IOException e) {
308         close();
309         throw e;
310       }
311       return true;
312     }
313 
314   @VisibleForTesting
315   boolean isAnyRPCcancelled() {
316     return callable.isAnyRPCcancelled();
317   }
318 
319   Result[] call(ScannerCallableWithReplicas callable,
320       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
321       throws IOException, RuntimeException {
322     if (Thread.interrupted()) {
323       throw new InterruptedIOException();
324     }
325     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
326     // we do a callWithRetries
327     return caller.callWithoutRetries(callable, scannerTimeout);
328   }
329 
330     @InterfaceAudience.Private
331     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
332         int nbRows) {
333       scan.setStartRow(localStartKey);
334       ScannerCallable s =
335           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
336               this.rpcControllerFactory);
337       s.setCaching(nbRows);
338       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
339        s, pool, primaryOperationTimeout, scan,
340        retries, scannerTimeout, caching, conf, caller);
341       return sr;
342     }
343 
344     /**
345      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
346      * application or TableInputFormat.Later, we could push it to other systems. We don't use
347      * metrics framework because it doesn't support multi-instances of the same metrics on the same
348      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
349      *
350      * By default, scan metrics are disabled; if the application wants to collect them, this
351      * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
352      * 
353      * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
354      */
355     protected void writeScanMetrics() {
356       if (this.scanMetrics == null || scanMetricsPublished) {
357         return;
358       }
359       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
360       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
361       scanMetricsPublished = true;
362     }
363 
364     @Override
365     public Result next() throws IOException {
366       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
367       if (cache.size() == 0 && this.closed) {
368         return null;
369       }
370       if (cache.size() == 0) {
371         loadCache();
372       }
373 
374       if (cache.size() > 0) {
375         return cache.poll();
376       }
377 
378       // if we exhausted this scanner before calling close, write out the scan metrics
379       writeScanMetrics();
380       return null;
381     }
382 
383   @VisibleForTesting
384   public int getCacheSize() {
385     return cache != null ? cache.size() : 0;
386   }
387 
388   /**
389    * Contact the servers to load more {@link Result}s in the cache.
390    */
391   protected void loadCache() throws IOException {
392     Result[] values = null;
393     long remainingResultSize = maxScannerResultSize;
394     int countdown = this.caching;
395     // We need to reset it if it's a new callable that was created with a countdown in nextScanner
396     callable.setCaching(this.caching);
397     // This flag is set when we want to skip the result returned. We do
398     // this when we reset scanner because it split under us.
399     boolean retryAfterOutOfOrderException = true;
400     // We don't expect that the server will have more results for us if
401     // it doesn't tell us otherwise. We rely on the size or count of results
402     boolean serverHasMoreResults = false;
403     boolean allResultsSkipped = false;
404     // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
405     // make sure that we are not retrying indefinitely.
406     int retriesLeft = getRetries();
407     do {
408       allResultsSkipped = false;
409       try {
410         // Server returns a null values if scanning is to stop. Else,
411         // returns an empty array if scanning is to go on and we've just
412         // exhausted current region.
413         values = call(callable, caller, scannerTimeout);
414         // When the replica switch happens, we need to do certain operations again.
415         // The callable will openScanner with the right startkey but we need to pick up
416         // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
417         // of the loop as it happens for the cases where we see exceptions.
418         // Since only openScanner would have happened, values would be null
419         if (values == null && callable.switchedToADifferentReplica()) {
420           // Any accumulated partial results are no longer valid since the callable will
421           // openScanner with the correct startkey and we must pick up from there
422           clearPartialResults();
423           this.currentRegion = callable.getHRegionInfo();
424           continue;
425         }
426         retryAfterOutOfOrderException = true;
427       } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
428         // An exception was thrown which makes any partial results that we were collecting
429         // invalid. The scanner will need to be reset to the beginning of a row.
430         clearPartialResults();
431 
432         // Unfortunately, DNRIOE is used in two different semantics.
433         // (1) The first is to close the client scanner and bubble up the exception all the way
434         // to the application. This is preferred when the exception is really un-recoverable
435         // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
436         // bucket usually.
437         // (2) Second semantics is to close the current region scanner only, but continue the
438         // client scanner by overriding the exception. This is usually UnknownScannerException,
439         // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
440         // application-level ClientScanner has to continue without bubbling up the exception to
441         // the client. See RSRpcServices to see how it throws DNRIOE's.
442         // See also: HBASE-16604, HBASE-17187
443 
444         // If exception is any but the list below throw it back to the client; else setup
445         // the scanner and retry.
446         Throwable cause = e.getCause();
447         if ((cause != null && cause instanceof NotServingRegionException) ||
448             (cause != null && cause instanceof RegionServerStoppedException) ||
449             e instanceof OutOfOrderScannerNextException ||
450             e instanceof UnknownScannerException ||
451             e instanceof ScannerResetException) {
452           // Pass. It is easier writing the if loop test as list of what is allowed rather than
453           // as a list of what is not allowed... so if in here, it means we do not throw.
454           if (retriesLeft-- <= 0) {
455             throw e; // no more retries
456           }
457         } else {
458           throw e;
459         }
460 
461         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
462         if (this.lastResult != null) {
463           // The region has moved. We need to open a brand new scanner at the new location.
464           // Reset the startRow to the row we've seen last so that the new scanner starts at
465           // the correct row. Otherwise we may see previously returned rows again.
466           // (ScannerCallable by now has "relocated" the correct region)
467           if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
468             if (scan.isReversed()) {
469               scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
470             } else {
471               scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
472             }
473           } else {
474             // we need rescan this row because we only loaded partial row before
475             scan.setStartRow(lastResult.getRow());
476           }
477         }
478         if (e instanceof OutOfOrderScannerNextException) {
479           if (retryAfterOutOfOrderException) {
480             retryAfterOutOfOrderException = false;
481           } else {
482             // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
483             throw new DoNotRetryIOException("Failed after retry of " +
484                 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
485           }
486         }
487         // Clear region.
488         this.currentRegion = null;
489         // Set this to zero so we don't try and do an rpc and close on remote server when
490         // the exception we got was UnknownScanner or the Server is going down.
491         callable = null;
492         // This continue will take us to while at end of loop where we will set up new scanner.
493         continue;
494       }
495       long currentTime = System.currentTimeMillis();
496       if (this.scanMetrics != null) {
497         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
498       }
499       lastNext = currentTime;
500       // Groom the array of Results that we received back from the server before adding that
501       // Results to the scanner's cache. If partial results are not allowed to be seen by the
502       // caller, all book keeping will be performed within this method.
503       List<Result> resultsToAddToCache =
504           getResultsToAddToCache(values, callable.isHeartbeatMessage());
505       if (!resultsToAddToCache.isEmpty()) {
506         for (Result rs : resultsToAddToCache) {
507           rs = filterLoadedCell(rs);
508           if (rs == null) {
509             continue;
510           }
511           cache.add(rs);
512           for (Cell cell : rs.rawCells()) {
513             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
514           }
515           countdown--;
516           this.lastResult = rs;
517           if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
518             updateLastCellLoadedToCache(this.lastResult);
519           } else {
520             this.lastCellLoadedToCache = null;
521           }
522         }
523         if (cache.isEmpty()) {
524           // all result has been seen before, we need scan more.
525           allResultsSkipped = true;
526           continue;
527         }
528       }
529       if (callable.isHeartbeatMessage()) {
530         if (cache.size() > 0) {
531           // Caller of this method just wants a Result. If we see a heartbeat message, it means
532           // processing of the scan is taking a long time server side. Rather than continue to
533           // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
534           // unnecesary delays to the caller
535           if (LOG.isTraceEnabled()) {
536             LOG.trace("Heartbeat message received and cache contains Results."
537                     + " Breaking out of scan loop");
538           }
539           break;
540         }
541         continue;
542       }
543 
544       // We expect that the server won't have more results for us when we exhaust
545       // the size (bytes or count) of the results returned. If the server *does* inform us that
546       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
547       // get results is the moreResults context valid.
548       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
549         // Only adhere to more server results when we don't have any partialResults
550         // as it keeps the outer loop logic the same.
551         serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
552       }
553       // Values == null means server-side filter has determined we must STOP
554       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
555       // row. We should not change scanners before we receive all the partial Results for that
556       // row.
557     } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
558         || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
559         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
560   }
561 
562   /**
563    * @param remainingResultSize
564    * @param remainingRows
565    * @param regionHasMoreResults
566    * @return true when the current region has been exhausted. When the current region has been
567    *         exhausted, the region must be changed before scanning can continue
568    */
569   private boolean doneWithRegion(long remainingResultSize, int remainingRows,
570       boolean regionHasMoreResults) {
571     return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
572   }
573 
574   /**
575    * This method ensures all of our book keeping regarding partial results is kept up to date. This
576    * method should be called once we know that the results we received back from the RPC request do
577    * not contain errors. We return a list of results that should be added to the cache. In general,
578    * this list will contain all NON-partial results from the input array (unless the client has
579    * specified that they are okay with receiving partial results)
580    * @param resultsFromServer The array of {@link Result}s returned from the server
581    * @param heartbeatMessage Flag indicating whether or not the response received from the server
582    *          represented a complete response, or a heartbeat message that was sent to keep the
583    *          client-server connection alive
584    * @return the list of results that should be added to the cache.
585    * @throws IOException
586    */
587   protected List<Result>
588       getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
589           throws IOException {
590     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
591     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
592 
593     final boolean isBatchSet = scan != null && scan.getBatch() > 0;
594     final boolean allowPartials = scan != null && scan.getAllowPartialResults();
595 
596     // If the caller has indicated in their scan that they are okay with seeing partial results,
597     // then simply add all results to the list. Note that since scan batching also returns results
598     // for a row in pieces we treat batch being set as equivalent to allowing partials. The
599     // implication of treating batching as equivalent to partial results is that it is possible
600     // the caller will receive a result back where the number of cells in the result is less than
601     // the batch size even though it may not be the last group of cells for that row.
602     if (allowPartials || isBatchSet) {
603       addResultsToList(resultsToAddToCache, resultsFromServer, 0,
604           (null == resultsFromServer ? 0 : resultsFromServer.length));
605       return resultsToAddToCache;
606     }
607 
608     // If no results were returned it indicates that either we have the all the partial results
609     // necessary to construct the complete result or the server had to send a heartbeat message
610     // to the client to keep the client-server connection alive
611     if (resultsFromServer == null || resultsFromServer.length == 0) {
612       // If this response was an empty heartbeat message, then we have not exhausted the region
613       // and thus there may be more partials server side that still need to be added to the partial
614       // list before we form the complete Result
615       if (!partialResults.isEmpty() && !heartbeatMessage) {
616         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
617         clearPartialResults();
618       }
619 
620       return resultsToAddToCache;
621     }
622 
623     // In every RPC response there should be at most a single partial result. Furthermore, if
624     // there is a partial result, it is guaranteed to be in the last position of the array.
625     Result last = resultsFromServer[resultsFromServer.length - 1];
626     Result partial = last.isPartial() ? last : null;
627 
628     if (LOG.isTraceEnabled()) {
629       StringBuilder sb = new StringBuilder();
630       sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
631       sb.append("partial != null: ").append(partial != null).append(",");
632       sb.append("number of partials so far: ").append(partialResults.size());
633       LOG.trace(sb.toString());
634     }
635 
636     // There are three possibilities cases that can occur while handling partial results
637     //
638     // 1. (partial != null && partialResults.isEmpty())
639     // This is the first partial result that we have received. It should be added to
640     // the list of partialResults and await the next RPC request at which point another
641     // portion of the complete result will be received
642     //
643     // 2. !partialResults.isEmpty()
644     // Since our partialResults list is not empty it means that we have been accumulating partial
645     // Results for a particular row. We cannot form the complete/whole Result for that row until
646     // all partials for the row have been received. Thus we loop through all of the Results
647     // returned from the server and determine whether or not all partial Results for the row have
648     // been received. We know that we have received all of the partial Results for the row when:
649     // i) We notice a row change in the Results
650     // ii) We see a Result for the partial row that is NOT marked as a partial Result
651     //
652     // 3. (partial == null && partialResults.isEmpty())
653     // Business as usual. We are not accumulating partial results and there wasn't a partial result
654     // in the RPC response. This means that all of the results we received from the server are
655     // complete and can be added directly to the cache
656     if (partial != null && partialResults.isEmpty()) {
657       addToPartialResults(partial);
658 
659       // Exclude the last result, it's a partial
660       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
661     } else if (!partialResults.isEmpty()) {
662       for (int i = 0; i < resultsFromServer.length; i++) {
663         Result result = resultsFromServer[i];
664 
665         // This result is from the same row as the partial Results. Add it to the list of partials
666         // and check if it was the last partial Result for that row
667         if (Bytes.equals(partialResultsRow, result.getRow())) {
668           addToPartialResults(result);
669 
670           // If the result is not a partial, it is a signal to us that it is the last Result we
671           // need to form the complete Result client-side
672           if (!result.isPartial()) {
673             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
674             clearPartialResults();
675           }
676         } else {
677           // The row of this result differs from the row of the partial results we have received so
678           // far. If our list of partials isn't empty, this is a signal to form the complete Result
679           // since the row has now changed
680           if (!partialResults.isEmpty()) {
681             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
682             clearPartialResults();
683           }
684 
685           // It's possible that in one response from the server we receive the final partial for
686           // one row and receive a partial for a different row. Thus, make sure that all Results
687           // are added to the proper list
688           if (result.isPartial()) {
689             addToPartialResults(result);
690           } else {
691             resultsToAddToCache.add(result);
692           }
693         }
694       }
695     } else { // partial == null && partialResults.isEmpty() -- business as usual
696       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
697     }
698 
699     return resultsToAddToCache;
700   }
701 
702   /**
703    * A convenience method for adding a Result to our list of partials. This method ensure that only
704    * Results that belong to the same row as the other partials can be added to the list.
705    * @param result The result that we want to add to our list of partial Results
706    * @throws IOException
707    */
708   private void addToPartialResults(final Result result) throws IOException {
709     final byte[] row = result.getRow();
710     if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
711       throw new IOException("Partial result row does not match. All partial results must come "
712           + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
713           + Bytes.toString(row));
714     }
715     partialResultsRow = row;
716     partialResults.add(result);
717   }
718 
719   /**
720    * Convenience method for clearing the list of partials and resetting the partialResultsRow.
721    */
722   private void clearPartialResults() {
723     partialResults.clear();
724     partialResultsRow = null;
725   }
726 
727   /**
728    * Helper method for adding results between the indices [start, end) to the outputList
729    * @param outputList the list that results will be added to
730    * @param inputArray the array that results are taken from
731    * @param start beginning index (inclusive)
732    * @param end ending index (exclusive)
733    */
734   private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
735     if (inputArray == null || start < 0 || end > inputArray.length) return;
736 
737     for (int i = start; i < end; i++) {
738       outputList.add(inputArray[i]);
739     }
740   }
741 
742     @Override
743     public void close() {
744       if (!scanMetricsPublished) writeScanMetrics();
745       if (callable != null) {
746         callable.setClose();
747         try {
748           call(callable, caller, scannerTimeout);
749         } catch (UnknownScannerException e) {
750            // We used to catch this error, interpret, and rethrow. However, we
751            // have since decided that it's not nice for a scanner's close to
752            // throw exceptions. Chances are it was just due to lease time out.
753         } catch (IOException e) {
754            /* An exception other than UnknownScanner is unexpected. */
755            LOG.warn("scanner failed to close. Exception follows: " + e);
756         }
757         callable = null;
758       }
759       closed = true;
760     }
761 
762   /**
763    * Create the closest row before the specified row
764    * @param row
765    * @return a new byte array which is the closest front row of the specified one
766    */
767   protected static byte[] createClosestRowBefore(byte[] row) {
768     if (row == null) {
769       throw new IllegalArgumentException("The passed row is empty");
770     }
771     if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
772       return MAX_BYTE_ARRAY;
773     }
774     if (row[row.length - 1] == 0) {
775       return Arrays.copyOf(row, row.length - 1);
776     } else {
777       byte[] closestFrontRow = Arrays.copyOf(row, row.length);
778       closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
779       closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
780       return closestFrontRow;
781     }
782   }
783 
784   @Override
785   public boolean renewLease() {
786     if (callable != null) {
787       // do not return any rows, do not advance the scanner
788       callable.setRenew(true);
789       try {
790         this.caller.callWithoutRetries(callable, this.scannerTimeout);
791       } catch (Exception e) {
792         return false;
793       } finally {
794         callable.setRenew(false);
795       }
796       return true;
797     }
798     return false;
799   }
800 
801   protected void updateLastCellLoadedToCache(Result result) {
802     if (result.rawCells().length == 0) {
803       return;
804     }
805     this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
806   }
807 
808   /**
809    * Compare two Cells considering reversed scanner.
810    * ReversedScanner only reverses rows, not columns.
811    */
812   private int compare(Cell a, Cell b) {
813     int r = 0;
814     if (currentRegion != null && currentRegion.isMetaRegion()) {
815       r = metaComparator.compareRows(a, b);
816     } else {
817       r = CellComparator.compareRows(a, b);
818     }
819     if (r != 0) {
820       return this.scan.isReversed() ? -r : r;
821     }
822     return CellComparator.compareWithoutRow(a, b);
823   }
824 
825   private Result filterLoadedCell(Result result) {
826     // we only filter result when last result is partial
827     // so lastCellLoadedToCache and result should have same row key.
828     // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
829     // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
830     if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
831       return result;
832     }
833     if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
834       // The first cell of this result is larger than the last cell of loadcache.
835       // If user do not allow partial result, it must be true.
836       return result;
837     }
838     if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
839       // The last cell of this result is smaller than the last cell of loadcache, skip all.
840       return null;
841     }
842 
843     // The first one must not in filtered result, we start at the second.
844     int index = 1;
845     while (index < result.rawCells().length) {
846       if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
847         break;
848       }
849       index++;
850     }
851     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
852     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
853   }
854 }