View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.LinkedList;
23  import java.util.concurrent.ExecutorService;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.UnknownScannerException;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
39  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
42  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
43  import org.apache.hadoop.hbase.util.Bytes;
44  
45  /**
46   * Implements the scanner interface for the HBase client.
47   * If there are multiple regions in a table, this scanner will iterate
48   * through them all.
49   */
50  @InterfaceAudience.Private
51  public class ClientScanner extends AbstractClientScanner {
52      private final Log LOG = LogFactory.getLog(this.getClass());
53      protected Scan scan;
54      protected boolean closed = false;
55      // Current region scanner is against.  Gets cleared if current region goes
56      // wonky: e.g. if it splits on us.
57      protected HRegionInfo currentRegion = null;
58      protected ScannerCallableWithReplicas callable = null;
59      protected final LinkedList<Result> cache = new LinkedList<Result>();
60      protected final int caching;
61      protected long lastNext;
62      // Keep lastResult returned successfully in case we have to reset scanner.
63      protected Result lastResult = null;
64      protected final long maxScannerResultSize;
65      private final ClusterConnection connection;
66      private final TableName tableName;
67      protected final int scannerTimeout;
68      protected boolean scanMetricsPublished = false;
69      protected RpcRetryingCaller<Result []> caller;
70      protected RpcControllerFactory rpcControllerFactory;
71      protected Configuration conf;
72      //The timeout on the primary. Applicable if there are multiple replicas for a region
73      //In that case, we will only wait for this much timeout on the primary before going
74      //to the replicas and trying the same scan. Note that the retries will still happen
75      //on each replica and the first successful results will be taken. A timeout of 0 is
76      //disallowed.
77      protected final int primaryOperationTimeout;
78      private int retries;
79      protected final ExecutorService pool;
80  
81    /**
82     * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
83     * row maybe changed changed.
84     * @param conf The {@link Configuration} to use.
85     * @param scan {@link Scan} to use in this scanner
86     * @param tableName The table that we wish to scan
87     * @param connection Connection identifying the cluster
88     * @throws IOException
89     */
90    public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
91        ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
92        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException {
93        if (LOG.isTraceEnabled()) {
94          LOG.trace("Scan table=" + tableName
95              + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
96        }
97        this.scan = scan;
98        this.tableName = tableName;
99        this.lastNext = System.currentTimeMillis();
100       this.connection = connection;
101       this.pool = pool;
102       this.primaryOperationTimeout = primaryOperationTimeout;
103       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
104           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
105       if (scan.getMaxResultSize() > 0) {
106         this.maxScannerResultSize = scan.getMaxResultSize();
107       } else {
108         this.maxScannerResultSize = conf.getLong(
109           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
110           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
111       }
112       this.scannerTimeout = HBaseConfiguration.getInt(conf,
113         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
114         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
115         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
116 
117       // check if application wants to collect scan metrics
118       initScanMetrics(scan);
119 
120       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
121       if (this.scan.getCaching() > 0) {
122         this.caching = this.scan.getCaching();
123       } else {
124         this.caching = conf.getInt(
125             HConstants.HBASE_CLIENT_SCANNER_CACHING,
126             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
127       }
128 
129       this.caller = rpcFactory.<Result[]> newCaller();
130       this.rpcControllerFactory = controllerFactory;
131 
132       this.conf = conf;
133       initializeScannerInConstruction();
134     }
135 
136     protected void initializeScannerInConstruction() throws IOException{
137       // initialize the scanner
138       nextScanner(this.caching, false);
139     }
140 
141     protected ClusterConnection getConnection() {
142       return this.connection;
143     }
144 
145     /**
146      * @return Table name
147      * @deprecated Since 0.96.0; use {@link #getTable()}
148      */
149     @Deprecated
150     protected byte [] getTableName() {
151       return this.tableName.getName();
152     }
153 
154     protected TableName getTable() {
155       return this.tableName;
156     }
157 
158     protected int getRetries() {
159       return this.retries;
160     }
161 
162     protected int getScannerTimeout() {
163       return this.scannerTimeout;
164     }
165 
166     protected Configuration getConf() {
167       return this.conf;
168     }
169 
170     protected Scan getScan() {
171       return scan;
172     }
173 
174     protected ExecutorService getPool() {
175       return pool;
176     }
177 
178     protected int getPrimaryOperationTimeout() {
179       return primaryOperationTimeout;
180     }
181 
182     protected int getCaching() {
183       return caching;
184     }
185 
186     protected long getTimestamp() {
187       return lastNext;
188     }
189 
190     // returns true if the passed region endKey
191     protected boolean checkScanStopRow(final byte [] endKey) {
192       if (this.scan.getStopRow().length > 0) {
193         // there is a stop row, check to see if we are past it.
194         byte [] stopRow = scan.getStopRow();
195         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
196           endKey, 0, endKey.length);
197         if (cmp <= 0) {
198           // stopRow <= endKey (endKey is equals to or larger than stopRow)
199           // This is a stop.
200           return true;
201         }
202       }
203       return false; //unlikely.
204     }
205 
206     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
207       // If we have just switched replica, don't go to the next scanner yet. Rather, try
208       // the scanner operations on the new replica, from the right point in the scan
209       // Note that when we switched to a different replica we left it at a point
210       // where we just did the "openScanner" with the appropriate startrow
211       if (callable != null && callable.switchedToADifferentReplica()) return true;
212       return nextScanner(nbRows, done);
213     }
214 
215     /*
216      * Gets a scanner for the next region.  If this.currentRegion != null, then
217      * we will move to the endrow of this.currentRegion.  Else we will get
218      * scanner at the scan.getStartRow().  We will go no further, just tidy
219      * up outstanding scanners, if <code>currentRegion != null</code> and
220      * <code>done</code> is true.
221      * @param nbRows
222      * @param done Server-side says we're done scanning.
223      */
224   protected boolean nextScanner(int nbRows, final boolean done)
225     throws IOException {
226       // Close the previous scanner if it's open
227       if (this.callable != null) {
228         this.callable.setClose();
229         call(scan, callable, caller, scannerTimeout);
230         this.callable = null;
231       }
232 
233       // Where to start the next scanner
234       byte [] localStartKey;
235 
236       // if we're at end of table, close and return false to stop iterating
237       if (this.currentRegion != null) {
238         byte [] endKey = this.currentRegion.getEndKey();
239         if (endKey == null ||
240             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
241             checkScanStopRow(endKey) ||
242             done) {
243           close();
244           if (LOG.isTraceEnabled()) {
245             LOG.trace("Finished " + this.currentRegion);
246           }
247           return false;
248         }
249         localStartKey = endKey;
250         if (LOG.isTraceEnabled()) {
251           LOG.trace("Finished " + this.currentRegion);
252         }
253       } else {
254         localStartKey = this.scan.getStartRow();
255       }
256 
257       if (LOG.isDebugEnabled() && this.currentRegion != null) {
258         // Only worth logging if NOT first region in scan.
259         LOG.debug("Advancing internal scanner to startKey at '" +
260           Bytes.toStringBinary(localStartKey) + "'");
261       }
262       try {
263         callable = getScannerCallable(localStartKey, nbRows);
264         // Open a scanner on the region server starting at the
265         // beginning of the region
266         call(scan, callable, caller, scannerTimeout);
267         this.currentRegion = callable.getHRegionInfo();
268         if (this.scanMetrics != null) {
269           this.scanMetrics.countOfRegions.incrementAndGet();
270         }
271       } catch (IOException e) {
272         close();
273         throw e;
274       }
275       return true;
276     }
277 
278   static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
279       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
280       throws IOException, RuntimeException {
281     if (Thread.interrupted()) {
282       throw new InterruptedIOException();
283     }
284     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
285     // we do a callWithRetries
286     return caller.callWithoutRetries(callable, scannerTimeout);
287   }
288 
289     @InterfaceAudience.Private
290     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
291         int nbRows) {
292       scan.setStartRow(localStartKey);
293       ScannerCallable s =
294           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
295               this.rpcControllerFactory);
296       s.setCaching(nbRows);
297       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
298        s, pool, primaryOperationTimeout, scan,
299        retries, scannerTimeout, caching, conf, caller);
300       return sr;
301     }
302 
303     /**
304      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
305      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
306      * framework because it doesn't support multi-instances of the same metrics on the same machine;
307      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
308      *
309      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
310      * can be turned on by calling calling:
311      *
312      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
313      */
314     protected void writeScanMetrics() {
315       if (this.scanMetrics == null || scanMetricsPublished) {
316         return;
317       }
318       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
319       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
320       scanMetricsPublished = true;
321     }
322 
323     @Override
324     public Result next() throws IOException {
325       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
326       if (cache.size() == 0 && this.closed) {
327         return null;
328       }
329       if (cache.size() == 0) {
330         Result [] values = null;
331         long remainingResultSize = maxScannerResultSize;
332         int countdown = this.caching;
333         // We need to reset it if it's a new callable that was created
334         // with a countdown in nextScanner
335         callable.setCaching(this.caching);
336         // This flag is set when we want to skip the result returned.  We do
337         // this when we reset scanner because it split under us.
338         boolean skipFirst = false;
339         boolean retryAfterOutOfOrderException  = true;
340         do {
341           try {
342             if (skipFirst) {
343               // Skip only the first row (which was the last row of the last
344               // already-processed batch).
345               callable.setCaching(1);
346               values = call(scan, callable, caller, scannerTimeout);
347               // When the replica switch happens, we need to do certain operations
348               // again. The scannercallable will openScanner with the right startkey
349               // but we need to pick up from there. Bypass the rest of the loop
350               // and let the catch-up happen in the beginning of the loop as it
351               // happens for the cases where we see exceptions. Since only openScanner
352               // would have happened, values would be null
353               if (values == null && callable.switchedToADifferentReplica()) {
354                 if (this.lastResult != null) { //only skip if there was something read earlier
355                   skipFirst = true;
356                 }
357                 this.currentRegion = callable.getHRegionInfo();
358                 continue;
359               }
360               callable.setCaching(this.caching);
361               skipFirst = false;
362             }
363             // Server returns a null values if scanning is to stop.  Else,
364             // returns an empty array if scanning is to go on and we've just
365             // exhausted current region.
366             values = call(scan, callable, caller, scannerTimeout);
367             if (skipFirst && values != null && values.length == 1) {
368               skipFirst = false; // Already skipped, unset it before scanning again
369               values = call(scan, callable, caller, scannerTimeout);
370             }
371             // When the replica switch happens, we need to do certain operations
372             // again. The callable will openScanner with the right startkey
373             // but we need to pick up from there. Bypass the rest of the loop
374             // and let the catch-up happen in the beginning of the loop as it
375             // happens for the cases where we see exceptions. Since only openScanner
376             // would have happened, values would be null
377             if (values == null && callable.switchedToADifferentReplica()) {
378               if (this.lastResult != null) { //only skip if there was something read earlier
379                 skipFirst = true;
380               }
381               this.currentRegion = callable.getHRegionInfo();
382               continue;
383             }
384             retryAfterOutOfOrderException  = true;
385           } catch (DoNotRetryIOException e) {
386             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
387             // to reset the scanner and come back in again.
388             if (e instanceof UnknownScannerException) {
389               long timeout = lastNext + scannerTimeout;
390               // If we are over the timeout, throw this exception to the client wrapped in
391               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
392               // id against the new region server; reset the scanner.
393               if (timeout < System.currentTimeMillis()) {
394                 long elapsed = System.currentTimeMillis() - lastNext;
395                 ScannerTimeoutException ex = new ScannerTimeoutException(
396                     elapsed + "ms passed since the last invocation, " +
397                         "timeout is currently set to " + scannerTimeout);
398                 ex.initCause(e);
399                 throw ex;
400               }
401             } else {
402               // If exception is any but the list below throw it back to the client; else setup
403               // the scanner and retry.
404               Throwable cause = e.getCause();
405               if ((cause != null && cause instanceof NotServingRegionException) ||
406                 (cause != null && cause instanceof RegionServerStoppedException) ||
407                 e instanceof OutOfOrderScannerNextException) {
408                 // Pass
409                 // It is easier writing the if loop test as list of what is allowed rather than
410                 // as a list of what is not allowed... so if in here, it means we do not throw.
411               } else {
412                 throw e;
413               }
414             }
415             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
416             if (this.lastResult != null) {
417               // The region has moved. We need to open a brand new scanner at
418               // the new location.
419               // Reset the startRow to the row we've seen last so that the new
420               // scanner starts at the correct row. Otherwise we may see previously
421               // returned rows again.
422               // (ScannerCallable by now has "relocated" the correct region)
423               this.scan.setStartRow(this.lastResult.getRow());
424 
425               // Skip first row returned.  We already let it out on previous
426               // invocation.
427               skipFirst = true;
428             }
429             if (e instanceof OutOfOrderScannerNextException) {
430               if (retryAfterOutOfOrderException) {
431                 retryAfterOutOfOrderException = false;
432               } else {
433                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
434                 throw new DoNotRetryIOException("Failed after retry of " +
435                   "OutOfOrderScannerNextException: was there a rpc timeout?", e);
436               }
437             }
438             // Clear region.
439             this.currentRegion = null;
440             // Set this to zero so we don't try and do an rpc and close on remote server when
441             // the exception we got was UnknownScanner or the Server is going down.
442             callable = null;
443             // This continue will take us to while at end of loop where we will set up new scanner.
444             continue;
445           }
446           long currentTime = System.currentTimeMillis();
447           if (this.scanMetrics != null ) {
448             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
449           }
450           lastNext = currentTime;
451           if (values != null && values.length > 0) {
452             for (Result rs : values) {
453               cache.add(rs);
454               // We don't make Iterator here
455               for (Cell cell : rs.rawCells()) {
456                 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
457               }
458               countdown--;
459               this.lastResult = rs;
460             }
461           }
462           // Values == null means server-side filter has determined we must STOP
463         } while (remainingResultSize > 0 && countdown > 0 &&
464             possiblyNextScanner(countdown, values == null));
465       }
466 
467       if (cache.size() > 0) {
468         return cache.poll();
469       }
470 
471       // if we exhausted this scanner before calling close, write out the scan metrics
472       writeScanMetrics();
473       return null;
474     }
475 
476     @Override
477     public void close() {
478       if (!scanMetricsPublished) writeScanMetrics();
479       if (callable != null) {
480         callable.setClose();
481         try {
482           call(scan, callable, caller, scannerTimeout);
483         } catch (UnknownScannerException e) {
484            // We used to catch this error, interpret, and rethrow. However, we
485            // have since decided that it's not nice for a scanner's close to
486            // throw exceptions. Chances are it was just due to lease time out.
487         } catch (IOException e) {
488            /* An exception other than UnknownScanner is unexpected. */
489            LOG.warn("scanner failed to close. Exception follows: " + e);
490         }
491         callable = null;
492       }
493       closed = true;
494     }
495 }