View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.NotServingRegionException;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.UnknownScannerException;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
42  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
44  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
45  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
46  import org.apache.hadoop.hbase.util.Bytes;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  /**
51   * Implements the scanner interface for the HBase client.
52   * If there are multiple regions in a table, this scanner will iterate
53   * through them all.
54   */
55  @InterfaceAudience.Private
56  public class ClientScanner extends AbstractClientScanner {
57      private final Log LOG = LogFactory.getLog(this.getClass());
58      // A byte array in which all elements are the max byte, and it is used to
59      // construct closest front row
60      static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
61      protected Scan scan;
62      protected boolean closed = false;
63      // Current region scanner is against.  Gets cleared if current region goes
64      // wonky: e.g. if it splits on us.
65      protected HRegionInfo currentRegion = null;
66      protected ScannerCallableWithReplicas callable = null;
67      protected final LinkedList<Result> cache = new LinkedList<Result>();
68      /**
69       * A list of partial results that have been returned from the server. This list should only
70       * contain results if this scanner does not have enough partial results to form the complete
71       * result.
72       */
73      protected final LinkedList<Result> partialResults = new LinkedList<Result>();
74      protected final int caching;
75      protected long lastNext;
76      // Keep lastResult returned successfully in case we have to reset scanner.
77      protected Result lastResult = null;
78      protected final long maxScannerResultSize;
79      private final ClusterConnection connection;
80      private final TableName tableName;
81      protected final int scannerTimeout;
82      protected boolean scanMetricsPublished = false;
83      protected RpcRetryingCaller<Result []> caller;
84      protected RpcControllerFactory rpcControllerFactory;
85      protected Configuration conf;
86      //The timeout on the primary. Applicable if there are multiple replicas for a region
87      //In that case, we will only wait for this much timeout on the primary before going
88      //to the replicas and trying the same scan. Note that the retries will still happen
89      //on each replica and the first successful results will be taken. A timeout of 0 is
90      //disallowed.
91      protected final int primaryOperationTimeout;
92      private int retries;
93      protected final ExecutorService pool;
94  
95    /**
96     * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
97     * row maybe changed changed.
98     * @param conf The {@link Configuration} to use.
99     * @param scan {@link Scan} to use in this scanner
100    * @param tableName The table that we wish to scan
101    * @param connection Connection identifying the cluster
102    * @throws IOException
103    */
104   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
105       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
106       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
107       throws IOException {
108       if (LOG.isTraceEnabled()) {
109         LOG.trace("Scan table=" + tableName
110             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
111       }
112       this.scan = scan;
113       this.tableName = tableName;
114       this.lastNext = System.currentTimeMillis();
115       this.connection = connection;
116       this.pool = pool;
117       this.primaryOperationTimeout = primaryOperationTimeout;
118       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
119           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
120       if (scan.getMaxResultSize() > 0) {
121         this.maxScannerResultSize = scan.getMaxResultSize();
122       } else {
123         this.maxScannerResultSize = conf.getLong(
124           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
125           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
126       }
127       this.scannerTimeout = HBaseConfiguration.getInt(conf,
128         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
129         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
130         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
131 
132       // check if application wants to collect scan metrics
133       initScanMetrics(scan);
134 
135       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
136       if (this.scan.getCaching() > 0) {
137         this.caching = this.scan.getCaching();
138       } else {
139         this.caching = conf.getInt(
140             HConstants.HBASE_CLIENT_SCANNER_CACHING,
141             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
142       }
143 
144       this.caller = rpcFactory.<Result[]> newCaller();
145       this.rpcControllerFactory = controllerFactory;
146 
147       this.conf = conf;
148       initializeScannerInConstruction();
149     }
150 
151     protected void initializeScannerInConstruction() throws IOException{
152       // initialize the scanner
153       nextScanner(this.caching, false);
154     }
155 
156     protected ClusterConnection getConnection() {
157       return this.connection;
158     }
159 
160     /**
161      * @return Table name
162      * @deprecated Since 0.96.0; use {@link #getTable()}
163      */
164     @Deprecated
165     protected byte [] getTableName() {
166       return this.tableName.getName();
167     }
168 
169     protected TableName getTable() {
170       return this.tableName;
171     }
172 
173     protected int getRetries() {
174       return this.retries;
175     }
176 
177     protected int getScannerTimeout() {
178       return this.scannerTimeout;
179     }
180 
181     protected Configuration getConf() {
182       return this.conf;
183     }
184 
185     protected Scan getScan() {
186       return scan;
187     }
188 
189     protected ExecutorService getPool() {
190       return pool;
191     }
192 
193     protected int getPrimaryOperationTimeout() {
194       return primaryOperationTimeout;
195     }
196 
197     protected int getCaching() {
198       return caching;
199     }
200 
201     protected long getTimestamp() {
202       return lastNext;
203     }
204 
205     // returns true if the passed region endKey
206     protected boolean checkScanStopRow(final byte [] endKey) {
207       if (this.scan.getStopRow().length > 0) {
208         // there is a stop row, check to see if we are past it.
209         byte [] stopRow = scan.getStopRow();
210         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
211           endKey, 0, endKey.length);
212         if (cmp <= 0) {
213           // stopRow <= endKey (endKey is equals to or larger than stopRow)
214           // This is a stop.
215           return true;
216         }
217       }
218       return false; //unlikely.
219     }
220 
221     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
222       // If we have just switched replica, don't go to the next scanner yet. Rather, try
223       // the scanner operations on the new replica, from the right point in the scan
224       // Note that when we switched to a different replica we left it at a point
225       // where we just did the "openScanner" with the appropriate startrow
226       if (callable != null && callable.switchedToADifferentReplica()) return true;
227       return nextScanner(nbRows, done);
228     }
229 
230     /*
231      * Gets a scanner for the next region.  If this.currentRegion != null, then
232      * we will move to the endrow of this.currentRegion.  Else we will get
233      * scanner at the scan.getStartRow().  We will go no further, just tidy
234      * up outstanding scanners, if <code>currentRegion != null</code> and
235      * <code>done</code> is true.
236      * @param nbRows
237      * @param done Server-side says we're done scanning.
238      */
239   protected boolean nextScanner(int nbRows, final boolean done)
240     throws IOException {
241       // Close the previous scanner if it's open
242       if (this.callable != null) {
243         this.callable.setClose();
244         call(callable, caller, scannerTimeout);
245         this.callable = null;
246       }
247 
248       // Where to start the next scanner
249       byte [] localStartKey;
250 
251       // if we're at end of table, close and return false to stop iterating
252       if (this.currentRegion != null) {
253         byte [] endKey = this.currentRegion.getEndKey();
254         if (endKey == null ||
255             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
256             checkScanStopRow(endKey) ||
257             done) {
258           close();
259           if (LOG.isTraceEnabled()) {
260             LOG.trace("Finished " + this.currentRegion);
261           }
262           return false;
263         }
264         localStartKey = endKey;
265         if (LOG.isTraceEnabled()) {
266           LOG.trace("Finished " + this.currentRegion);
267         }
268       } else {
269         localStartKey = this.scan.getStartRow();
270       }
271 
272       if (LOG.isDebugEnabled() && this.currentRegion != null) {
273         // Only worth logging if NOT first region in scan.
274         LOG.debug("Advancing internal scanner to startKey at '" +
275           Bytes.toStringBinary(localStartKey) + "'");
276       }
277       try {
278         callable = getScannerCallable(localStartKey, nbRows);
279         // Open a scanner on the region server starting at the
280         // beginning of the region
281         call(callable, caller, scannerTimeout);
282         this.currentRegion = callable.getHRegionInfo();
283         if (this.scanMetrics != null) {
284           this.scanMetrics.countOfRegions.incrementAndGet();
285         }
286       } catch (IOException e) {
287         close();
288         throw e;
289       }
290       return true;
291     }
292 
293   @VisibleForTesting
294   boolean isAnyRPCcancelled() {
295     return callable.isAnyRPCcancelled();
296   }
297 
298   static Result[] call(ScannerCallableWithReplicas callable,
299       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
300       throws IOException, RuntimeException {
301     if (Thread.interrupted()) {
302       throw new InterruptedIOException();
303     }
304     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
305     // we do a callWithRetries
306     return caller.callWithoutRetries(callable, scannerTimeout);
307   }
308 
309     @InterfaceAudience.Private
310     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
311         int nbRows) {
312       scan.setStartRow(localStartKey);
313       ScannerCallable s =
314           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
315               this.rpcControllerFactory);
316       s.setCaching(nbRows);
317       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
318        s, pool, primaryOperationTimeout, scan,
319        retries, scannerTimeout, caching, conf, caller);
320       return sr;
321     }
322 
323     /**
324      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
325      * application or TableInputFormat.Later, we could push it to other systems. We don't use
326      * metrics framework because it doesn't support multi-instances of the same metrics on the same
327      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
328      *
329      * By default, scan metrics are disabled; if the application wants to collect them, this
330      * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
331      * 
332      * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
333      */
334     protected void writeScanMetrics() {
335       if (this.scanMetrics == null || scanMetricsPublished) {
336         return;
337       }
338       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
339       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
340       scanMetricsPublished = true;
341     }
342 
343     @Override
344     public Result next() throws IOException {
345       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
346       if (cache.size() == 0 && this.closed) {
347         return null;
348       }
349       if (cache.size() == 0) {
350         Result[] values = null;
351         long remainingResultSize = maxScannerResultSize;
352         int countdown = this.caching;
353 
354         // We need to reset it if it's a new callable that was created
355         // with a countdown in nextScanner
356         callable.setCaching(this.caching);
357         // This flag is set when we want to skip the result returned. We do
358         // this when we reset scanner because it split under us.
359         boolean retryAfterOutOfOrderException = true;
360         do {
361           try {
362             // Server returns a null values if scanning is to stop. Else,
363             // returns an empty array if scanning is to go on and we've just
364             // exhausted current region.
365             values = call(callable, caller, scannerTimeout);
366 
367             // When the replica switch happens, we need to do certain operations
368             // again. The callable will openScanner with the right startkey
369             // but we need to pick up from there. Bypass the rest of the loop
370             // and let the catch-up happen in the beginning of the loop as it
371             // happens for the cases where we see exceptions. Since only openScanner
372             // would have happened, values would be null
373             if (values == null && callable.switchedToADifferentReplica()) {
374               this.currentRegion = callable.getHRegionInfo();
375               continue;
376             }
377             retryAfterOutOfOrderException = true;
378           } catch (DoNotRetryIOException e) {
379             // An exception was thrown which makes any partial results that we were collecting
380             // invalid. The scanner will need to be reset to the beginning of a row.
381             partialResults.clear();
382 
383             // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
384             // to reset the scanner and come back in again.
385             if (e instanceof UnknownScannerException) {
386               long timeout = lastNext + scannerTimeout;
387               // If we are over the timeout, throw this exception to the client wrapped in
388               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
389               // id against the new region server; reset the scanner.
390               if (timeout < System.currentTimeMillis()) {
391                 long elapsed = System.currentTimeMillis() - lastNext;
392                 ScannerTimeoutException ex =
393                     new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
394                         + "timeout is currently set to " + scannerTimeout);
395                 ex.initCause(e);
396                 throw ex;
397               }
398             } else {
399               // If exception is any but the list below throw it back to the client; else setup
400               // the scanner and retry.
401               Throwable cause = e.getCause();
402               if ((cause != null && cause instanceof NotServingRegionException) ||
403                   (cause != null && cause instanceof RegionServerStoppedException) ||
404                   e instanceof OutOfOrderScannerNextException) {
405                 // Pass
406                 // It is easier writing the if loop test as list of what is allowed rather than
407                 // as a list of what is not allowed... so if in here, it means we do not throw.
408               } else {
409                 throw e;
410               }
411             }
412             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
413             if (this.lastResult != null) {
414               // The region has moved. We need to open a brand new scanner at
415               // the new location.
416               // Reset the startRow to the row we've seen last so that the new
417               // scanner starts at the correct row. Otherwise we may see previously
418               // returned rows again.
419               // (ScannerCallable by now has "relocated" the correct region)
420               if (scan.isReversed()) {
421                 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
422               } else {
423                 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
424               }
425             }
426             if (e instanceof OutOfOrderScannerNextException) {
427               if (retryAfterOutOfOrderException) {
428                 retryAfterOutOfOrderException = false;
429               } else {
430                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
431                 throw new DoNotRetryIOException("Failed after retry of " +
432                     "OutOfOrderScannerNextException: was there a rpc timeout?", e);
433               }
434             }
435             // Clear region.
436             this.currentRegion = null;
437             // Set this to zero so we don't try and do an rpc and close on remote server when
438             // the exception we got was UnknownScanner or the Server is going down.
439             callable = null;
440 
441             // This continue will take us to while at end of loop where we will set up new scanner.
442             continue;
443           }
444           long currentTime = System.currentTimeMillis();
445           if (this.scanMetrics != null) {
446             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
447           }
448           lastNext = currentTime;
449           // Groom the array of Results that we received back from the server before adding that
450           // Results to the scanner's cache. If partial results are not allowed to be seen by the
451           // caller, all book keeping will be performed within this method.
452           List<Result> resultsToAddToCache = getResultsToAddToCache(values);
453           if (!resultsToAddToCache.isEmpty()) {
454             for (Result rs : resultsToAddToCache) {
455               cache.add(rs);
456               // We don't make Iterator here
457               for (Cell cell : rs.rawCells()) {
458                 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
459               }
460               countdown--;
461               this.lastResult = rs;
462             }
463           }
464           // Values == null means server-side filter has determined we must STOP
465           // !partialResults.isEmpty() means that we are still accumulating partial Results for a
466           // row. We should not change scanners before we receive all the partial Results for that
467           // row.
468         } while (remainingResultSize > 0 && countdown > 0
469             && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
470       }
471 
472       if (cache.size() > 0) {
473         return cache.poll();
474       }
475 
476       // if we exhausted this scanner before calling close, write out the scan metrics
477       writeScanMetrics();
478       return null;
479     }
480 
481   @VisibleForTesting
482   public int getCacheSize() {
483     return cache != null ? cache.size() : 0;
484   }
485 
486   /**
487    * This method ensures all of our book keeping regarding partial results is kept up to date. This
488    * method should be called once we know that the results we received back from the RPC request do
489    * not contain errors. We return a list of results that should be added to the cache. In general,
490    * this list will contain all NON-partial results from the input array (unless the client has
491    * specified that they are okay with receiving partial results)
492    * @return the list of results that should be added to the cache.
493    * @throws IOException
494    */
495   protected List<Result> getResultsToAddToCache(Result[] resultsFromServer) throws IOException {
496     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
497     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
498 
499     final boolean isBatchSet = scan != null && scan.getBatch() > 0;
500     final boolean allowPartials = scan != null && scan.getAllowPartialResults();
501 
502     // If the caller has indicated in their scan that they are okay with seeing partial results,
503     // then simply add all results to the list. Note that since scan batching also returns results
504     // for a row in pieces we treat batch being set as equivalent to allowing partials. The
505     // implication of treating batching as equivalent to partial results is that it is possible
506     // the caller will receive a result back where the number of cells in the result is less than
507     // the batch size even though it may not be the last group of cells for that row.
508     if (allowPartials || isBatchSet) {
509       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
510       return resultsToAddToCache;
511     }
512 
513     // If no results were returned it indicates that we have the all the partial results necessary
514     // to construct the complete result.
515     if (resultsFromServer == null || resultsFromServer.length == 0) {
516       if (!partialResults.isEmpty()) {
517         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
518         partialResults.clear();
519       }
520 
521       return resultsToAddToCache;
522     }
523 
524     // In every RPC response there should be at most a single partial result. Furthermore, if
525     // there is a partial result, it is guaranteed to be in the last position of the array.
526     Result last = resultsFromServer[resultsFromServer.length - 1];
527     Result partial = last.isPartial() ? last : null;
528 
529     if (LOG.isTraceEnabled()) {
530       StringBuilder sb = new StringBuilder();
531       sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
532       sb.append("partial != null: ").append(partial != null).append(",");
533       sb.append("number of partials so far: ").append(partialResults.size());
534       LOG.trace(sb.toString());
535     }
536 
537     // There are four possibilities cases that can occur while handling partial results
538     //
539     // 1. (partial != null && partialResults.isEmpty())
540     // This is the first partial result that we have received. It should be added to
541     // the list of partialResults and await the next RPC request at which point another
542     // portion of the complete result will be received
543     //
544     // 2. (partial != null && !partialResults.isEmpty())
545     // a. values.length == 1
546     // Since partialResults contains some elements, it means that we are expecting to receive
547     // the remainder of the complete result within this RPC response. The fact that a partial result
548     // was returned and it's the ONLY result returned indicates that we are still receiving
549     // fragments of the complete result. The Result can be completely formed only when we have
550     // received all of the fragments and thus in this case we simply add the partial result to
551     // our list.
552     //
553     // b. values.length > 1
554     // More than one result has been returned from the server. The fact that we are accumulating
555     // partials in partialList and we just received more than one result back from the server
556     // indicates that the FIRST result we received from the server must be the final fragment that
557     // can be used to complete our result. What this means is that the partial that we received is
558     // a partial result for a different row, and at this point we should combine the existing
559     // partials into a complete result, clear the partialList, and begin accumulating partials for
560     // a new row
561     //
562     // 3. (partial == null && !partialResults.isEmpty())
563     // No partial was received but we are accumulating partials in our list. That means the final
564     // fragment of the complete result will be the first Result in values[]. We use it to create the
565     // complete Result, clear the list, and add it to the list of Results that must be added to the
566     // cache. All other Results in values[] are added after the complete result to maintain proper
567     // ordering
568     //
569     // 4. (partial == null && partialResults.isEmpty())
570     // Business as usual. We are not accumulating partial results and there wasn't a partial result
571     // in the RPC response. This means that all of the results we received from the server are
572     // complete and can be added directly to the cache
573     if (partial != null && partialResults.isEmpty()) {
574       partialResults.add(partial);
575 
576       // Exclude the last result, it's a partial
577       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
578     } else if (partial != null && !partialResults.isEmpty()) {
579       if (resultsFromServer.length > 1) {
580         Result finalResult = resultsFromServer[0];
581         partialResults.add(finalResult);
582         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
583         partialResults.clear();
584 
585         // Exclude first result, it was used to form our complete result
586         // Exclude last result, it's a partial result
587         addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length - 1);
588       }
589       partialResults.add(partial);
590     } else if (partial == null && !partialResults.isEmpty()) {
591       Result finalResult = resultsFromServer[0];
592       partialResults.add(finalResult);
593       resultsToAddToCache.add(Result.createCompleteResult(partialResults));
594       partialResults.clear();
595 
596       // Exclude the first result, it was used to form our complete result
597       addResultsToList(resultsToAddToCache, resultsFromServer, 1, resultsFromServer.length);
598     } else { // partial == null && partialResults.isEmpty() -- business as usual
599       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
600     }
601 
602     return resultsToAddToCache;
603   }
604 
605   /**
606    * Helper method for adding results between the indices [start, end) to the outputList
607    * @param outputList the list that results will be added to
608    * @param inputArray the array that results are taken from
609    * @param start beginning index (inclusive)
610    * @param end ending index (exclusive)
611    */
612   private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
613     if (inputArray == null || start < 0 || end > inputArray.length) return;
614 
615     for (int i = start; i < end; i++) {
616       outputList.add(inputArray[i]);
617     }
618   }
619 
620     @Override
621     public void close() {
622       if (!scanMetricsPublished) writeScanMetrics();
623       if (callable != null) {
624         callable.setClose();
625         try {
626           call(callable, caller, scannerTimeout);
627         } catch (UnknownScannerException e) {
628            // We used to catch this error, interpret, and rethrow. However, we
629            // have since decided that it's not nice for a scanner's close to
630            // throw exceptions. Chances are it was just due to lease time out.
631           if (LOG.isDebugEnabled()) {
632             LOG.debug("scanner failed to close", e);
633           }
634         } catch (IOException e) {
635           /* An exception other than UnknownScanner is unexpected. */
636           LOG.warn("scanner failed to close.", e);
637         }
638         callable = null;
639       }
640       closed = true;
641     }
642 
643   /**
644    * Create the closest row before the specified row
645    * @param row
646    * @return a new byte array which is the closest front row of the specified one
647    */
648   protected static byte[] createClosestRowBefore(byte[] row) {
649     if (row == null) {
650       throw new IllegalArgumentException("The passed row is empty");
651     }
652     if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
653       return MAX_BYTE_ARRAY;
654     }
655     if (row[row.length - 1] == 0) {
656       return Arrays.copyOf(row, row.length - 1);
657     } else {
658       byte[] closestFrontRow = Arrays.copyOf(row, row.length);
659       closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
660       closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
661       return closestFrontRow;
662     }
663   }
664 }