View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import com.google.common.annotations.VisibleForTesting;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellUtil;
26  import org.apache.hadoop.hbase.DoNotRetryIOException;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.NotServingRegionException;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.UnknownScannerException;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
35  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
38  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import java.io.IOException;
42  import java.io.InterruptedIOException;
43  import java.util.ArrayList;
44  import java.util.Arrays;
45  import java.util.LinkedList;
46  import java.util.List;
47  import java.util.Queue;
48  import java.util.concurrent.ExecutorService;
49  
50  /**
51   * Implements the scanner interface for the HBase client.
52   * If there are multiple regions in a table, this scanner will iterate
53   * through them all.
54   */
55  @InterfaceAudience.Private
56  public abstract class ClientScanner extends AbstractClientScanner {
57      private static final Log LOG = LogFactory.getLog(ClientScanner.class);
58      // A byte array in which all elements are the max byte, and it is used to
59      // construct closest front row
60      static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
61      protected Scan scan;
62      protected boolean closed = false;
63      // Current region scanner is against.  Gets cleared if current region goes
64      // wonky: e.g. if it splits on us.
65      protected HRegionInfo currentRegion = null;
66      protected ScannerCallableWithReplicas callable = null;
67      protected Queue<Result> cache;
68      /**
69       * A list of partial results that have been returned from the server. This list should only
70       * contain results if this scanner does not have enough partial results to form the complete
71       * result.
72       */
73      protected final LinkedList<Result> partialResults = new LinkedList<Result>();
74      /**
75       * The row for which we are accumulating partial Results (i.e. the row of the Results stored
76       * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
77       * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
78       */
79      protected byte[] partialResultsRow = null;
80      protected final int caching;
81      protected long lastNext;
82      // Keep lastResult returned successfully in case we have to reset scanner.
83      protected Result lastResult = null;
84      protected final long maxScannerResultSize;
85      private final ClusterConnection connection;
86      private final TableName tableName;
87      protected final int scannerTimeout;
88      protected boolean scanMetricsPublished = false;
89      protected RpcRetryingCaller<Result []> caller;
90      protected RpcControllerFactory rpcControllerFactory;
91      protected Configuration conf;
92      //The timeout on the primary. Applicable if there are multiple replicas for a region
93      //In that case, we will only wait for this much timeout on the primary before going
94      //to the replicas and trying the same scan. Note that the retries will still happen
95      //on each replica and the first successful results will be taken. A timeout of 0 is
96      //disallowed.
97      protected final int primaryOperationTimeout;
98      private int retries;
99      protected final ExecutorService pool;
100 
101   /**
102    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
103    * row maybe changed changed.
104    * @param conf The {@link Configuration} to use.
105    * @param scan {@link Scan} to use in this scanner
106    * @param tableName The table that we wish to scan
107    * @param connection Connection identifying the cluster
108    * @throws IOException
109    */
110   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
111       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
112       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
113       throws IOException {
114       if (LOG.isTraceEnabled()) {
115         LOG.trace("Scan table=" + tableName
116             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
117       }
118       this.scan = scan;
119       this.tableName = tableName;
120       this.lastNext = System.currentTimeMillis();
121       this.connection = connection;
122       this.pool = pool;
123       this.primaryOperationTimeout = primaryOperationTimeout;
124       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
125           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
126       if (scan.getMaxResultSize() > 0) {
127         this.maxScannerResultSize = scan.getMaxResultSize();
128       } else {
129         this.maxScannerResultSize = conf.getLong(
130           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
131           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
132       }
133       this.scannerTimeout = HBaseConfiguration.getInt(conf,
134         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
135         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
136         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
137 
138       // check if application wants to collect scan metrics
139       initScanMetrics(scan);
140 
141       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
142       if (this.scan.getCaching() > 0) {
143         this.caching = this.scan.getCaching();
144       } else {
145         this.caching = conf.getInt(
146             HConstants.HBASE_CLIENT_SCANNER_CACHING,
147             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
148       }
149 
150       this.caller = rpcFactory.<Result[]> newCaller();
151       this.rpcControllerFactory = controllerFactory;
152 
153       this.conf = conf;
154       initCache();
155       initializeScannerInConstruction();
156     }
157 
158     protected abstract void initCache();
159 
160     protected void initializeScannerInConstruction() throws IOException{
161       // initialize the scanner
162       nextScanner(this.caching, false);
163     }
164 
165     protected ClusterConnection getConnection() {
166       return this.connection;
167     }
168 
169     protected TableName getTable() {
170       return this.tableName;
171     }
172 
173     protected int getRetries() {
174       return this.retries;
175     }
176 
177     protected int getScannerTimeout() {
178       return this.scannerTimeout;
179     }
180 
181     protected Configuration getConf() {
182       return this.conf;
183     }
184 
185     protected Scan getScan() {
186       return scan;
187     }
188 
189     protected ExecutorService getPool() {
190       return pool;
191     }
192 
193     protected int getPrimaryOperationTimeout() {
194       return primaryOperationTimeout;
195     }
196 
197     protected int getCaching() {
198       return caching;
199     }
200 
201     protected long getTimestamp() {
202       return lastNext;
203     }
204 
205     @VisibleForTesting
206     protected long getMaxResultSize() {
207       return maxScannerResultSize;
208     }
209 
210     // returns true if the passed region endKey
211     protected boolean checkScanStopRow(final byte [] endKey) {
212       if (this.scan.getStopRow().length > 0) {
213         // there is a stop row, check to see if we are past it.
214         byte [] stopRow = scan.getStopRow();
215         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
216           endKey, 0, endKey.length);
217         if (cmp <= 0) {
218           // stopRow <= endKey (endKey is equals to or larger than stopRow)
219           // This is a stop.
220           return true;
221         }
222       }
223       return false; //unlikely.
224     }
225 
226     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
227       // If we have just switched replica, don't go to the next scanner yet. Rather, try
228       // the scanner operations on the new replica, from the right point in the scan
229       // Note that when we switched to a different replica we left it at a point
230       // where we just did the "openScanner" with the appropriate startrow
231       if (callable != null && callable.switchedToADifferentReplica()) return true;
232       return nextScanner(nbRows, done);
233     }
234 
235     /*
236      * Gets a scanner for the next region.  If this.currentRegion != null, then
237      * we will move to the endrow of this.currentRegion.  Else we will get
238      * scanner at the scan.getStartRow().  We will go no further, just tidy
239      * up outstanding scanners, if <code>currentRegion != null</code> and
240      * <code>done</code> is true.
241      * @param nbRows
242      * @param done Server-side says we're done scanning.
243      */
244   protected boolean nextScanner(int nbRows, final boolean done)
245     throws IOException {
246       // Close the previous scanner if it's open
247       if (this.callable != null) {
248         this.callable.setClose();
249         call(callable, caller, scannerTimeout);
250         this.callable = null;
251       }
252 
253       // Where to start the next scanner
254       byte [] localStartKey;
255 
256       // if we're at end of table, close and return false to stop iterating
257       if (this.currentRegion != null) {
258         byte [] endKey = this.currentRegion.getEndKey();
259         if (endKey == null ||
260             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
261             checkScanStopRow(endKey) ||
262             done) {
263           close();
264           if (LOG.isTraceEnabled()) {
265             LOG.trace("Finished " + this.currentRegion);
266           }
267           return false;
268         }
269         localStartKey = endKey;
270         if (LOG.isTraceEnabled()) {
271           LOG.trace("Finished " + this.currentRegion);
272         }
273       } else {
274         localStartKey = this.scan.getStartRow();
275       }
276 
277       if (LOG.isDebugEnabled() && this.currentRegion != null) {
278         // Only worth logging if NOT first region in scan.
279         LOG.debug("Advancing internal scanner to startKey at '" +
280           Bytes.toStringBinary(localStartKey) + "'");
281       }
282       try {
283         callable = getScannerCallable(localStartKey, nbRows);
284         // Open a scanner on the region server starting at the
285         // beginning of the region
286         call(callable, caller, scannerTimeout);
287         this.currentRegion = callable.getHRegionInfo();
288         if (this.scanMetrics != null) {
289           this.scanMetrics.countOfRegions.incrementAndGet();
290         }
291       } catch (IOException e) {
292         close();
293         throw e;
294       }
295       return true;
296     }
297 
298   @VisibleForTesting
299   boolean isAnyRPCcancelled() {
300     return callable.isAnyRPCcancelled();
301   }
302 
303   Result[] call(ScannerCallableWithReplicas callable,
304       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
305       throws IOException, RuntimeException {
306     if (Thread.interrupted()) {
307       throw new InterruptedIOException();
308     }
309     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
310     // we do a callWithRetries
311     return caller.callWithoutRetries(callable, scannerTimeout);
312   }
313 
314     @InterfaceAudience.Private
315     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
316         int nbRows) {
317       scan.setStartRow(localStartKey);
318       ScannerCallable s =
319           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
320               this.rpcControllerFactory);
321       s.setCaching(nbRows);
322       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
323        s, pool, primaryOperationTimeout, scan,
324        retries, scannerTimeout, caching, conf, caller);
325       return sr;
326     }
327 
328     /**
329      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
330      * application or TableInputFormat.Later, we could push it to other systems. We don't use
331      * metrics framework because it doesn't support multi-instances of the same metrics on the same
332      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
333      *
334      * By default, scan metrics are disabled; if the application wants to collect them, this
335      * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
336      *
337      * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
338      */
339     protected void writeScanMetrics() {
340       if (this.scanMetrics == null || scanMetricsPublished) {
341         return;
342       }
343       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
344       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
345       scanMetricsPublished = true;
346     }
347 
348     protected void initSyncCache() {
349     cache = new LinkedList<Result>();
350   }
351 
352     protected Result nextWithSyncCache() throws IOException {
353       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
354       if (cache.size() == 0 && this.closed) {
355         return null;
356       }
357       if (cache.size() == 0) {
358         loadCache();
359       }
360 
361       if (cache.size() > 0) {
362         return cache.poll();
363       }
364 
365       // if we exhausted this scanner before calling close, write out the scan metrics
366       writeScanMetrics();
367       return null;
368     }
369 
370   @VisibleForTesting
371   public int getCacheSize() {
372     return cache != null ? cache.size() : 0;
373   }
374 
375   /**
376    * Contact the servers to load more {@link Result}s in the cache.
377    */
378   protected void loadCache() throws IOException {
379     // check if scanner was closed during previous prefetch
380     if (closed) return;
381     Result[] values = null;
382     long remainingResultSize = maxScannerResultSize;
383     int countdown = this.caching;
384 
385     // We need to reset it if it's a new callable that was created
386     // with a countdown in nextScanner
387     callable.setCaching(this.caching);
388     // This flag is set when we want to skip the result returned. We do
389     // this when we reset scanner because it split under us.
390     boolean retryAfterOutOfOrderException = true;
391     // We don't expect that the server will have more results for us if
392     // it doesn't tell us otherwise. We rely on the size or count of results
393     boolean serverHasMoreResults = false;
394     do {
395       try {
396         // Server returns a null values if scanning is to stop. Else,
397         // returns an empty array if scanning is to go on and we've just
398         // exhausted current region.
399         values = call(callable, caller, scannerTimeout);
400         // When the replica switch happens, we need to do certain operations
401         // again. The callable will openScanner with the right startkey
402         // but we need to pick up from there. Bypass the rest of the loop
403         // and let the catch-up happen in the beginning of the loop as it
404         // happens for the cases where we see exceptions. Since only openScanner
405         // would have happened, values would be null
406         if (values == null && callable.switchedToADifferentReplica()) {
407           // Any accumulated partial results are no longer valid since the callable will
408           // openScanner with the correct startkey and we must pick up from there
409           clearPartialResults();
410           this.currentRegion = callable.getHRegionInfo();
411           continue;
412         }
413         retryAfterOutOfOrderException = true;
414       } catch (DoNotRetryIOException e) {
415         // An exception was thrown which makes any partial results that we were collecting
416         // invalid. The scanner will need to be reset to the beginning of a row.
417         clearPartialResults();
418 
419         // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
420         // to reset the scanner and come back in again.
421         if (e instanceof UnknownScannerException) {
422           long timeout = lastNext + scannerTimeout;
423           // If we are over the timeout, throw this exception to the client wrapped in
424           // a ScannerTimeoutException. Else, it's because the region moved and we used the old
425           // id against the new region server; reset the scanner.
426           if (timeout < System.currentTimeMillis()) {
427             LOG.info("For hints related to the following exception, please try taking a look at: " +
428                 "https://hbase.apache.org/book.html#trouble.client.scantimeout");
429             long elapsed = System.currentTimeMillis() - lastNext;
430             ScannerTimeoutException ex =
431                 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
432                     + "timeout is currently set to " + scannerTimeout);
433             ex.initCause(e);
434             throw ex;
435           }
436         } else {
437           // If exception is any but the list below throw it back to the client; else setup
438           // the scanner and retry.
439           Throwable cause = e.getCause();
440           if ((cause != null && cause instanceof NotServingRegionException) ||
441               (cause != null && cause instanceof RegionServerStoppedException) ||
442               e instanceof OutOfOrderScannerNextException) {
443             // Pass
444             // It is easier writing the if loop test as list of what is allowed rather than
445             // as a list of what is not allowed... so if in here, it means we do not throw.
446           } else {
447             throw e;
448           }
449         }
450         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
451         if (this.lastResult != null) {
452           // The region has moved. We need to open a brand new scanner at
453           // the new location.
454           // Reset the startRow to the row we've seen last so that the new
455           // scanner starts at the correct row. Otherwise we may see previously
456           // returned rows again.
457           // (ScannerCallable by now has "relocated" the correct region)
458           if (scan.isReversed()) {
459             scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
460           } else {
461             scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
462           }
463         }
464         if (e instanceof OutOfOrderScannerNextException) {
465           if (retryAfterOutOfOrderException) {
466             retryAfterOutOfOrderException = false;
467           } else {
468             // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
469             throw new DoNotRetryIOException("Failed after retry of " +
470                 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
471           }
472         }
473         // Clear region.
474         this.currentRegion = null;
475         // Set this to zero so we don't try and do an rpc and close on remote server when
476         // the exception we got was UnknownScanner or the Server is going down.
477         callable = null;
478 
479         // This continue will take us to while at end of loop where we will set up new scanner.
480         continue;
481       }
482       long currentTime = System.currentTimeMillis();
483       if (this.scanMetrics != null) {
484         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
485       }
486       lastNext = currentTime;
487       // Groom the array of Results that we received back from the server before adding that
488       // Results to the scanner's cache. If partial results are not allowed to be seen by the
489       // caller, all book keeping will be performed within this method.
490       List<Result> resultsToAddToCache =
491           getResultsToAddToCache(values, callable.isHeartbeatMessage());
492       if (!resultsToAddToCache.isEmpty()) {
493         for (Result rs : resultsToAddToCache) {
494           cache.add(rs);
495           long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
496           countdown--;
497           remainingResultSize -= estimatedHeapSizeOfResult;
498           addEstimatedSize(estimatedHeapSizeOfResult);
499           this.lastResult = rs;
500         }
501       }
502 
503       // Caller of this method just wants a Result. If we see a heartbeat message, it means
504       // processing of the scan is taking a long time server side. Rather than continue to
505       // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
506       // unnecesary delays to the caller
507       if (callable.isHeartbeatMessage() && cache.size() > 0) {
508         if (LOG.isTraceEnabled()) {
509           LOG.trace("Heartbeat message received and cache contains Results."
510               + " Breaking out of scan loop");
511         }
512         break;
513       }
514 
515       // We expect that the server won't have more results for us when we exhaust
516       // the size (bytes or count) of the results returned. If the server *does* inform us that
517       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
518       // get results is the moreResults context valid.
519       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
520         // Only adhere to more server results when we don't have any partialResults
521         // as it keeps the outer loop logic the same.
522         serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty();
523       }
524       // Values == null means server-side filter has determined we must STOP
525       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
526       // row. We should not change scanners before we receive all the partial Results for that
527       // row.
528     } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
529         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
530   }
531 
532   /**
533    * @param remainingResultSize
534    * @param remainingRows
535    * @param regionHasMoreResults
536    * @return true when the current region has been exhausted. When the current region has been
537    *         exhausted, the region must be changed before scanning can continue
538    */
539   private boolean doneWithRegion(long remainingResultSize, int remainingRows,
540       boolean regionHasMoreResults) {
541     return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
542   }
543 
544   protected long calcEstimatedSize(Result rs) {
545     long estimatedHeapSizeOfResult = 0;
546     // We don't make Iterator here
547     for (Cell cell : rs.rawCells()) {
548       estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell);
549     }
550     return estimatedHeapSizeOfResult;
551   }
552 
553   protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
554     return;
555   }
556 
557   @VisibleForTesting
558   public int getCacheCount() {
559     return cache != null ? cache.size() : 0;
560   }
561 
562   /**
563    * This method ensures all of our book keeping regarding partial results is kept up to date. This
564    * method should be called once we know that the results we received back from the RPC request do
565    * not contain errors. We return a list of results that should be added to the cache. In general,
566    * this list will contain all NON-partial results from the input array (unless the client has
567    * specified that they are okay with receiving partial results)
568    * @param resultsFromServer The array of {@link Result}s returned from the server
569    * @param heartbeatMessage Flag indicating whether or not the response received from the server
570    *          represented a complete response, or a heartbeat message that was sent to keep the
571    *          client-server connection alive
572    * @return the list of results that should be added to the cache.
573    * @throws IOException
574    */
575   protected List<Result>
576       getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
577           throws IOException {
578     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
579     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
580 
581     final boolean isBatchSet = scan != null && scan.getBatch() > 0;
582     final boolean allowPartials = scan != null && scan.getAllowPartialResults();
583 
584     // If the caller has indicated in their scan that they are okay with seeing partial results,
585     // then simply add all results to the list. Note that since scan batching also returns results
586     // for a row in pieces we treat batch being set as equivalent to allowing partials. The
587     // implication of treating batching as equivalent to partial results is that it is possible
588     // the caller will receive a result back where the number of cells in the result is less than
589     // the batch size even though it may not be the last group of cells for that row.
590     if (allowPartials || isBatchSet) {
591       addResultsToList(resultsToAddToCache, resultsFromServer, 0,
592           (null == resultsFromServer ? 0 : resultsFromServer.length));
593       return resultsToAddToCache;
594     }
595 
596     // If no results were returned it indicates that either we have the all the partial results
597     // necessary to construct the complete result or the server had to send a heartbeat message
598     // to the client to keep the client-server connection alive
599     if (resultsFromServer == null || resultsFromServer.length == 0) {
600       // If this response was an empty heartbeat message, then we have not exhausted the region
601       // and thus there may be more partials server side that still need to be added to the partial
602       // list before we form the complete Result
603       if (!partialResults.isEmpty() && !heartbeatMessage) {
604         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
605         clearPartialResults();
606       }
607 
608       return resultsToAddToCache;
609     }
610 
611     // In every RPC response there should be at most a single partial result. Furthermore, if
612     // there is a partial result, it is guaranteed to be in the last position of the array.
613     Result last = resultsFromServer[resultsFromServer.length - 1];
614     Result partial = last.isPartial() ? last : null;
615 
616     if (LOG.isTraceEnabled()) {
617       StringBuilder sb = new StringBuilder();
618       sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
619       sb.append("partial != null: ").append(partial != null).append(",");
620       sb.append("number of partials so far: ").append(partialResults.size());
621       LOG.trace(sb.toString());
622     }
623 
624     // There are three possibilities cases that can occur while handling partial results
625     //
626     // 1. (partial != null && partialResults.isEmpty())
627     // This is the first partial result that we have received. It should be added to
628     // the list of partialResults and await the next RPC request at which point another
629     // portion of the complete result will be received
630     //
631     // 2. !partialResults.isEmpty()
632     // Since our partialResults list is not empty it means that we have been accumulating partial
633     // Results for a particular row. We cannot form the complete/whole Result for that row until
634     // all partials for the row have been received. Thus we loop through all of the Results
635     // returned from the server and determine whether or not all partial Results for the row have
636     // been received. We know that we have received all of the partial Results for the row when:
637     // i) We notice a row change in the Results
638     // ii) We see a Result for the partial row that is NOT marked as a partial Result
639     //
640     // 3. (partial == null && partialResults.isEmpty())
641     // Business as usual. We are not accumulating partial results and there wasn't a partial result
642     // in the RPC response. This means that all of the results we received from the server are
643     // complete and can be added directly to the cache
644     if (partial != null && partialResults.isEmpty()) {
645       addToPartialResults(partial);
646 
647       // Exclude the last result, it's a partial
648       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
649     } else if (!partialResults.isEmpty()) {
650       for (int i = 0; i < resultsFromServer.length; i++) {
651         Result result = resultsFromServer[i];
652 
653         // This result is from the same row as the partial Results. Add it to the list of partials
654         // and check if it was the last partial Result for that row
655         if (Bytes.equals(partialResultsRow, result.getRow())) {
656           addToPartialResults(result);
657 
658           // If the result is not a partial, it is a signal to us that it is the last Result we
659           // need to form the complete Result client-side
660           if (!result.isPartial()) {
661             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
662             clearPartialResults();
663           }
664         } else {
665           // The row of this result differs from the row of the partial results we have received so
666           // far. If our list of partials isn't empty, this is a signal to form the complete Result
667           // since the row has now changed
668           if (!partialResults.isEmpty()) {
669             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
670             clearPartialResults();
671           }
672 
673           // It's possible that in one response from the server we receive the final partial for
674           // one row and receive a partial for a different row. Thus, make sure that all Results
675           // are added to the proper list
676           if (result.isPartial()) {
677             addToPartialResults(result);
678           } else {
679             resultsToAddToCache.add(result);
680           }
681         }
682       }
683     } else { // partial == null && partialResults.isEmpty() -- business as usual
684       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
685     }
686 
687     return resultsToAddToCache;
688   }
689 
690   /**
691    * A convenience method for adding a Result to our list of partials. This method ensure that only
692    * Results that belong to the same row as the other partials can be added to the list.
693    * @param result The result that we want to add to our list of partial Results
694    * @throws IOException
695    */
696   private void addToPartialResults(final Result result) throws IOException {
697     final byte[] row = result.getRow();
698     if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
699       throw new IOException("Partial result row does not match. All partial results must come "
700           + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
701           + Bytes.toString(row));
702     }
703     partialResultsRow = row;
704     partialResults.add(result);
705   }
706 
707   /**
708    * Convenience method for clearing the list of partials and resetting the partialResultsRow.
709    */
710   private void clearPartialResults() {
711     partialResults.clear();
712     partialResultsRow = null;
713   }
714 
715   /**
716    * Helper method for adding results between the indices [start, end) to the outputList
717    * @param outputList the list that results will be added to
718    * @param inputArray the array that results are taken from
719    * @param start beginning index (inclusive)
720    * @param end ending index (exclusive)
721    */
722   private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
723     if (inputArray == null || start < 0 || end > inputArray.length) return;
724 
725     for (int i = start; i < end; i++) {
726       outputList.add(inputArray[i]);
727     }
728   }
729 
730     @Override
731     public void close() {
732       if (!scanMetricsPublished) writeScanMetrics();
733       if (callable != null) {
734         callable.setClose();
735         try {
736           call(callable, caller, scannerTimeout);
737         } catch (UnknownScannerException e) {
738            // We used to catch this error, interpret, and rethrow. However, we
739            // have since decided that it's not nice for a scanner's close to
740            // throw exceptions. Chances are it was just due to lease time out.
741           if (LOG.isDebugEnabled()) {
742             LOG.debug("scanner failed to close", e);
743           }
744         } catch (IOException e) {
745           /* An exception other than UnknownScanner is unexpected. */
746           LOG.warn("scanner failed to close.", e);
747         }
748         callable = null;
749       }
750       closed = true;
751     }
752 
753   /**
754    * Create the closest row before the specified row
755    * @param row
756    * @return a new byte array which is the closest front row of the specified one
757    */
758   protected static byte[] createClosestRowBefore(byte[] row) {
759     if (row == null) {
760       throw new IllegalArgumentException("The passed row is empty");
761     }
762     if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
763       return MAX_BYTE_ARRAY;
764     }
765     if (row[row.length - 1] == 0) {
766       return Arrays.copyOf(row, row.length - 1);
767     } else {
768       byte[] closestFrontRow = Arrays.copyOf(row, row.length);
769       closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
770       closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
771       return closestFrontRow;
772     }
773   }
774 
775   @Override
776   public boolean renewLease() {
777     if (callable != null) {
778       // do not return any rows, do not advance the scanner
779       callable.setCaching(0);
780       try {
781         this.caller.callWithoutRetries(callable, this.scannerTimeout);
782       } catch (Exception e) {
783         return false;
784       } finally {
785         callable.setCaching(this.caching);
786       }
787       return true;
788     }
789     return false;
790   }
791 }