1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.classification.InterfaceAudience;
23  import org.apache.hadoop.classification.InterfaceStability;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
29  import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
30  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
31  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
32  import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
33  import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
34  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  import java.io.IOException;
39  import java.util.ArrayList;
40  import java.util.LinkedList;
41  
42  /**
43   * Implements the scanner interface for the HBase client.
44   * If there are multiple regions in a table, this scanner will iterate
45   * through them all.
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class ClientScanner extends AbstractClientScanner {
50      private final Log LOG = LogFactory.getLog(this.getClass());
51      private Scan scan;
52      private boolean closed = false;
53      // Current region scanner is against.  Gets cleared if current region goes
54      // wonky: e.g. if it splits on us.
55      private HRegionInfo currentRegion = null;
56      private ScannerCallable callable = null;
57      private final LinkedList<Result> cache = new LinkedList<Result>();
58      private final int caching;
59      private long lastNext;
60      // Keep lastResult returned successfully in case we have to reset scanner.
61      private Result lastResult = null;
62      private ScanMetrics scanMetrics = null;
63      private final long maxScannerResultSize;
64      private final HConnection connection;
65      private final byte[] tableName;
66      private final int scannerTimeout;
67      private boolean scanMetricsPublished = false;
68      
69      /**
70       * Create a new ClientScanner for the specified table. An HConnection will be
71       * retrieved using the passed Configuration.
72       * Note that the passed {@link Scan}'s start row maybe changed changed.
73       *
74       * @param conf The {@link Configuration} to use.
75       * @param scan {@link Scan} to use in this scanner
76       * @param tableName The table that we wish to scan
77       * @throws IOException
78       */
79      public ClientScanner(final Configuration conf, final Scan scan,
80          final byte[] tableName) throws IOException {
81        this(conf, scan, tableName, HConnectionManager.getConnection(conf));
82      }
83  
84      /**
85       * Create a new ClientScanner for the specified table
86       * Note that the passed {@link Scan}'s start row maybe changed changed.
87       *
88       * @param conf The {@link Configuration} to use.
89       * @param scan {@link Scan} to use in this scanner
90       * @param tableName The table that we wish to scan
91       * @param connection Connection identifying the cluster
92       * @throws IOException
93       */
94      public ClientScanner(final Configuration conf, final Scan scan,
95        final byte[] tableName, HConnection connection) throws IOException {
96        if (LOG.isDebugEnabled()) {
97          LOG.debug("Scan table=" + Bytes.toString(tableName)
98              + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
99        }
100       this.scan = scan;
101       this.tableName = tableName;
102       this.lastNext = System.currentTimeMillis();
103       this.connection = connection;
104       if (scan.getMaxResultSize() > 0) {
105         this.maxScannerResultSize = scan.getMaxResultSize();
106       } else {
107         this.maxScannerResultSize = conf.getLong(
108           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
109           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
110       }
111       this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
112         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
113 
114       // check if application wants to collect scan metrics
115       byte[] enableMetrics = scan.getAttribute(
116         Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
117       if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
118         scanMetrics = new ScanMetrics();
119       }
120 
121       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
122       if (this.scan.getCaching() > 0) {
123         this.caching = this.scan.getCaching();
124       } else {
125         this.caching = conf.getInt(
126             HConstants.HBASE_CLIENT_SCANNER_CACHING,
127             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
128       }
129 
130       // initialize the scanner
131       nextScanner(false);
132     }
133 
134     protected HConnection getConnection() {
135       return this.connection;
136     }
137 
138     protected byte[] getTableName() {
139       return this.tableName;
140     }
141 
142     protected Scan getScan() {
143       return scan;
144     }
145 
146     protected long getTimestamp() {
147       return lastNext;
148     }
149 
150     // returns true if the passed region endKey
151     private boolean checkScanStopRow(final byte [] endKey) {
152       if (this.scan.getStopRow().length > 0) {
153         // there is a stop row, check to see if we are past it.
154         byte [] stopRow = scan.getStopRow();
155         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
156           endKey, 0, endKey.length);
157         if (cmp <= 0) {
158           // stopRow <= endKey (endKey is equals to or larger than stopRow)
159           // This is a stop.
160           return true;
161         }
162       }
163       return false; //unlikely.
164     }
165 
166     /*
167      * Gets a scanner for the next region.  If this.currentRegion != null, then
168      * we will move to the endrow of this.currentRegion.  Else we will get
169      * scanner at the scan.getStartRow().  We will go no further, just tidy
170      * up outstanding scanners, if <code>currentRegion != null</code> and
171      * <code>done</code> is true.
172      * @param done Server-side says we're done scanning.
173      */
174     private boolean nextScanner(final boolean done)
175     throws IOException {
176       // Close the previous scanner if it's open
177       if (this.callable != null) {
178         this.callable.setClose();
179         callable.withRetries();
180         this.callable = null;
181       }
182 
183       // Where to start the next scanner
184       byte [] localStartKey;
185 
186       // if we're at end of table, close and return false to stop iterating
187       if (this.currentRegion != null) {
188         byte [] endKey = this.currentRegion.getEndKey();
189         if (endKey == null ||
190             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
191             checkScanStopRow(endKey) ||
192             done) {
193           close();
194           if (LOG.isDebugEnabled()) {
195             LOG.debug("Finished region=" + this.currentRegion);
196           }
197           return false;
198         }
199         localStartKey = endKey;
200         if (LOG.isDebugEnabled()) {
201           LOG.debug("Finished with region " + this.currentRegion);
202         }
203       } else {
204         localStartKey = this.scan.getStartRow();
205       }
206 
207       if (LOG.isDebugEnabled() && this.currentRegion != null) {
208         // Only worth logging if NOT first region in scan.
209         LOG.debug("Advancing internal scanner to startKey at '" +
210           Bytes.toStringBinary(localStartKey) + "'");
211       }
212       try {
213         callable = getScannerCallable(localStartKey);
214         // Open a scanner on the region server starting at the
215         // beginning of the region
216         callable.withRetries();
217         this.currentRegion = callable.getHRegionInfo();
218         if (this.scanMetrics != null) {
219           this.scanMetrics.countOfRegions.incrementAndGet();
220         }
221       } catch (IOException e) {
222         close();
223         throw e;
224       }
225       return true;
226     }
227 
228     protected ScannerCallable getScannerCallable(byte [] localStartKey) {
229       scan.setStartRow(localStartKey);
230       ScannerCallable s = new ScannerCallable(getConnection(),
231         getTableName(), scan, this.scanMetrics);
232       s.setCaching(this.caching);
233       return s;
234     }
235 
236     /**
237      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
238      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
239      * framework because it doesn't support multi-instances of the same metrics on the same machine;
240      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
241      *
242      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
243      * can be turned on by calling calling:
244      *
245      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
246      */
247     private void writeScanMetrics() {
248       if (this.scanMetrics == null || scanMetricsPublished) {
249         return;
250       }
251       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
252       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
253       scanMetricsPublished = true;
254     }
255 
256     public Result next() throws IOException {
257       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
258       if (cache.size() == 0 && this.closed) {
259         return null;
260       }
261       if (cache.size() == 0) {
262         Result [] values = null;
263         long remainingResultSize = maxScannerResultSize;
264         int countdown = this.caching;
265 
266         // This flag is set when we want to skip the result returned.  We do
267         // this when we reset scanner because it split under us.
268         boolean skipFirst = false;
269         boolean retryAfterOutOfOrderException  = true;
270         do {
271           try {
272             // Server returns a null values if scanning is to stop.  Else,
273             // returns an empty array if scanning is to go on and we've just
274             // exhausted current region.
275             values = callable.withRetries();
276             if (skipFirst && values != null && values.length == 1) {
277               skipFirst = false; // Already skipped, unset it before scanning again
278               values = callable.withRetries();
279             }
280             retryAfterOutOfOrderException  = true;
281           } catch (DoNotRetryIOException e) {
282             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
283             // to reset the scanner and come back in again.
284             if (e instanceof UnknownScannerException) {
285               long timeout = lastNext + scannerTimeout;
286               // If we are over the timeout, throw this exception to the client wrapped in
287               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
288               // id against the new region server; reset the scanner.
289               if (timeout < System.currentTimeMillis()) {
290                 long elapsed = System.currentTimeMillis() - lastNext;
291                 ScannerTimeoutException ex = new ScannerTimeoutException(
292                     elapsed + "ms passed since the last invocation, " +
293                         "timeout is currently set to " + scannerTimeout);
294                 ex.initCause(e);
295                 throw ex;
296               }
297             } else {
298               // If exception is any but the list below throw it back to the client; else setup
299               // the scanner and retry.
300               Throwable cause = e.getCause();
301               if ((cause != null && cause instanceof NotServingRegionException) ||
302                 (cause != null && cause instanceof RegionServerStoppedException) ||
303                 e instanceof OutOfOrderScannerNextException) {
304                 // Pass
305                 // It is easier writing the if loop test as list of what is allowed rather than
306                 // as a list of what is not allowed... so if in here, it means we do not throw.
307               } else {
308                 throw e;
309               }
310             }
311             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
312             if (this.lastResult != null) {
313               this.scan.setStartRow(this.lastResult.getRow());
314               // Skip first row returned.  We already let it out on previous
315               // invocation.
316               skipFirst = true;
317             }
318             if (e instanceof OutOfOrderScannerNextException) {
319               if (retryAfterOutOfOrderException) {
320                 retryAfterOutOfOrderException = false;
321               } else {
322                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
323                 throw new DoNotRetryIOException("Failed after retry of " +
324                   "OutOfOrderScannerNextException: was there a rpc timeout?", e);
325               }
326             }
327             // Clear region.
328             this.currentRegion = null;
329             // Set this to zero so we don't try and do an rpc and close on remote server when
330             // the exception we got was UnknownScanner or the Server is going down.
331             callable = null;
332             // This continue will take us to while at end of loop where we will set up new scanner.
333             continue;
334           }
335           long currentTime = System.currentTimeMillis();
336           if (this.scanMetrics != null ) {
337             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
338           }
339           lastNext = currentTime;
340           if (values != null && values.length > 0) {
341             int i = 0;
342             if (skipFirst) {
343               skipFirst = false;
344               // We will cache one row less, which is fine
345               countdown--;
346               i = 1;
347             }
348             for (; i < values.length; i++) {
349               Result rs = values[i];
350               cache.add(rs);
351               for (KeyValue kv : rs.raw()) {
352                   remainingResultSize -= kv.heapSize();
353               }
354               countdown--;
355               this.lastResult = rs;
356             }
357           }
358           // Values == null means server-side filter has determined we must STOP
359         } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
360       }
361 
362       if (cache.size() > 0) {
363         return cache.poll();
364       }
365 
366       // if we exhausted this scanner before calling close, write out the scan metrics
367       writeScanMetrics();
368       return null;
369     }
370 
371     /**
372      * Get <param>nbRows</param> rows.
373      * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
374      * setting (or hbase.client.scanner.caching in hbase-site.xml).
375      * @param nbRows number of rows to return
376      * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
377      * if returned array is of zero-length (We never return null).
378      * @throws IOException
379      */
380     public Result [] next(int nbRows) throws IOException {
381       // Collect values to be returned here
382       ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
383       for(int i = 0; i < nbRows; i++) {
384         Result next = next();
385         if (next != null) {
386           resultSets.add(next);
387         } else {
388           break;
389         }
390       }
391       return resultSets.toArray(new Result[resultSets.size()]);
392     }
393 
394     public void close() {
395       if (!scanMetricsPublished) writeScanMetrics();
396       if (callable != null) {
397         callable.setClose();
398         try {
399           callable.withRetries();
400         } catch (IOException e) {
401           // We used to catch this error, interpret, and rethrow. However, we
402           // have since decided that it's not nice for a scanner's close to
403           // throw exceptions. Chances are it was just an UnknownScanner
404           // exception due to lease time out.
405         }
406         callable = null;
407       }
408       closed = true;
409     }
410 
411     long currentScannerId() {
412       return (callable == null) ? -1L : callable.scannerId;
413     }
414 
415     HRegionInfo currentRegionInfo() {
416       return currentRegion;
417     }
418 }