View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.LinkedList;
23  import java.util.concurrent.ExecutorService;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.DoNotRetryIOException;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.UnknownScannerException;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
39  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
42  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
43  import org.apache.hadoop.hbase.util.Bytes;
44  
45  import com.google.common.annotations.VisibleForTesting;
46  
47  import static org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
48  
49  /**
50   * Implements the scanner interface for the HBase client.
51   * If there are multiple regions in a table, this scanner will iterate
52   * through them all.
53   */
54  @InterfaceAudience.Private
55  public class ClientScanner extends AbstractClientScanner {
56      private final Log LOG = LogFactory.getLog(this.getClass());
57      protected Scan scan;
58      protected boolean closed = false;
59      // Current region scanner is against.  Gets cleared if current region goes
60      // wonky: e.g. if it splits on us.
61      protected HRegionInfo currentRegion = null;
62      protected ScannerCallableWithReplicas callable = null;
63      protected final LinkedList<Result> cache = new LinkedList<Result>();
64      protected final int caching;
65      protected long lastNext;
66      // Keep lastResult returned successfully in case we have to reset scanner.
67      protected Result lastResult = null;
68      protected final long maxScannerResultSize;
69      private final ClusterConnection connection;
70      private final TableName tableName;
71      protected final int scannerTimeout;
72      protected boolean scanMetricsPublished = false;
73      protected RpcRetryingCaller<Result []> caller;
74      protected RpcControllerFactory rpcControllerFactory;
75      protected Configuration conf;
76      //The timeout on the primary. Applicable if there are multiple replicas for a region
77      //In that case, we will only wait for this much timeout on the primary before going
78      //to the replicas and trying the same scan. Note that the retries will still happen
79      //on each replica and the first successful results will be taken. A timeout of 0 is
80      //disallowed.
81      protected final int primaryOperationTimeout;
82      private int retries;
83      protected final ExecutorService pool;
84  
85    /**
86     * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
87     * row maybe changed changed.
88     * @param conf The {@link Configuration} to use.
89     * @param scan {@link Scan} to use in this scanner
90     * @param tableName The table that we wish to scan
91     * @param connection Connection identifying the cluster
92     * @throws IOException
93     */
94    public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
95        ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
96        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
97        throws IOException {
98        if (LOG.isTraceEnabled()) {
99          LOG.trace("Scan table=" + tableName
100             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
101       }
102       this.scan = scan;
103       this.tableName = tableName;
104       this.lastNext = System.currentTimeMillis();
105       this.connection = connection;
106       this.pool = pool;
107       this.primaryOperationTimeout = primaryOperationTimeout;
108       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
109           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
110       if (scan.getMaxResultSize() > 0) {
111         this.maxScannerResultSize = scan.getMaxResultSize();
112       } else {
113         this.maxScannerResultSize = conf.getLong(
114           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
115           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
116       }
117       this.scannerTimeout = HBaseConfiguration.getInt(conf,
118         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
119         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
120         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
121 
122       // check if application wants to collect scan metrics
123       initScanMetrics(scan);
124 
125       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
126       if (this.scan.getCaching() > 0) {
127         this.caching = this.scan.getCaching();
128       } else {
129         this.caching = conf.getInt(
130             HConstants.HBASE_CLIENT_SCANNER_CACHING,
131             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
132       }
133 
134       this.caller = rpcFactory.<Result[]> newCaller();
135       this.rpcControllerFactory = controllerFactory;
136 
137       this.conf = conf;
138       initializeScannerInConstruction();
139     }
140 
141     protected void initializeScannerInConstruction() throws IOException{
142       // initialize the scanner
143       nextScanner(this.caching, false);
144     }
145 
146     protected ClusterConnection getConnection() {
147       return this.connection;
148     }
149 
150     /**
151      * @return Table name
152      * @deprecated Since 0.96.0; use {@link #getTable()}
153      */
154     @Deprecated
155     protected byte [] getTableName() {
156       return this.tableName.getName();
157     }
158 
159     protected TableName getTable() {
160       return this.tableName;
161     }
162 
163     protected int getRetries() {
164       return this.retries;
165     }
166 
167     protected int getScannerTimeout() {
168       return this.scannerTimeout;
169     }
170 
171     protected Configuration getConf() {
172       return this.conf;
173     }
174 
175     protected Scan getScan() {
176       return scan;
177     }
178 
179     protected ExecutorService getPool() {
180       return pool;
181     }
182 
183     protected int getPrimaryOperationTimeout() {
184       return primaryOperationTimeout;
185     }
186 
187     protected int getCaching() {
188       return caching;
189     }
190 
191     protected long getTimestamp() {
192       return lastNext;
193     }
194 
195     // returns true if the passed region endKey
196     protected boolean checkScanStopRow(final byte [] endKey) {
197       if (this.scan.getStopRow().length > 0) {
198         // there is a stop row, check to see if we are past it.
199         byte [] stopRow = scan.getStopRow();
200         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
201           endKey, 0, endKey.length);
202         if (cmp <= 0) {
203           // stopRow <= endKey (endKey is equals to or larger than stopRow)
204           // This is a stop.
205           return true;
206         }
207       }
208       return false; //unlikely.
209     }
210 
211     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
212       // If we have just switched replica, don't go to the next scanner yet. Rather, try
213       // the scanner operations on the new replica, from the right point in the scan
214       // Note that when we switched to a different replica we left it at a point
215       // where we just did the "openScanner" with the appropriate startrow
216       if (callable != null && callable.switchedToADifferentReplica()) return true;
217       return nextScanner(nbRows, done);
218     }
219 
220     /*
221      * Gets a scanner for the next region.  If this.currentRegion != null, then
222      * we will move to the endrow of this.currentRegion.  Else we will get
223      * scanner at the scan.getStartRow().  We will go no further, just tidy
224      * up outstanding scanners, if <code>currentRegion != null</code> and
225      * <code>done</code> is true.
226      * @param nbRows
227      * @param done Server-side says we're done scanning.
228      */
229   protected boolean nextScanner(int nbRows, final boolean done)
230     throws IOException {
231       // Close the previous scanner if it's open
232       if (this.callable != null) {
233         this.callable.setClose();
234         call(callable, caller, scannerTimeout);
235         this.callable = null;
236       }
237 
238       // Where to start the next scanner
239       byte [] localStartKey;
240 
241       // if we're at end of table, close and return false to stop iterating
242       if (this.currentRegion != null) {
243         byte [] endKey = this.currentRegion.getEndKey();
244         if (endKey == null ||
245             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
246             checkScanStopRow(endKey) ||
247             done) {
248           close();
249           if (LOG.isTraceEnabled()) {
250             LOG.trace("Finished " + this.currentRegion);
251           }
252           return false;
253         }
254         localStartKey = endKey;
255         if (LOG.isTraceEnabled()) {
256           LOG.trace("Finished " + this.currentRegion);
257         }
258       } else {
259         localStartKey = this.scan.getStartRow();
260       }
261 
262       if (LOG.isDebugEnabled() && this.currentRegion != null) {
263         // Only worth logging if NOT first region in scan.
264         LOG.debug("Advancing internal scanner to startKey at '" +
265           Bytes.toStringBinary(localStartKey) + "'");
266       }
267       try {
268         callable = getScannerCallable(localStartKey, nbRows);
269         // Open a scanner on the region server starting at the
270         // beginning of the region
271         call(callable, caller, scannerTimeout);
272         this.currentRegion = callable.getHRegionInfo();
273         if (this.scanMetrics != null) {
274           this.scanMetrics.countOfRegions.incrementAndGet();
275         }
276       } catch (IOException e) {
277         close();
278         throw e;
279       }
280       return true;
281     }
282 
283   @VisibleForTesting
284   boolean isAnyRPCcancelled() {
285     return callable.isAnyRPCcancelled();
286   }
287 
288   static Result[] call(ScannerCallableWithReplicas callable,
289       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
290       throws IOException, RuntimeException {
291     if (Thread.interrupted()) {
292       throw new InterruptedIOException();
293     }
294     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
295     // we do a callWithRetries
296     return caller.callWithoutRetries(callable, scannerTimeout);
297   }
298 
299     @InterfaceAudience.Private
300     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
301         int nbRows) {
302       scan.setStartRow(localStartKey);
303       ScannerCallable s =
304           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
305               this.rpcControllerFactory);
306       s.setCaching(nbRows);
307       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
308        s, pool, primaryOperationTimeout, scan,
309        retries, scannerTimeout, caching, conf, caller);
310       return sr;
311     }
312 
313     /**
314      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
315      * application or TableInputFormat.Later, we could push it to other systems. We don't use
316      * metrics framework because it doesn't support multi-instances of the same metrics on the same
317      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
318      *
319      * By default, scan metrics are disabled; if the application wants to collect them, this
320      * behavior can be turned on by calling calling:
321      *
322      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
323      */
324     protected void writeScanMetrics() {
325       if (this.scanMetrics == null || scanMetricsPublished) {
326         return;
327       }
328       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
329       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
330       scanMetricsPublished = true;
331     }
332 
333     @Override
334     public Result next() throws IOException {
335       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
336       if (cache.size() == 0 && this.closed) {
337         return null;
338       }
339       if (cache.size() == 0) {
340         Result [] values = null;
341         long remainingResultSize = maxScannerResultSize;
342         int countdown = this.caching;
343         // We need to reset it if it's a new callable that was created
344         // with a countdown in nextScanner
345         callable.setCaching(this.caching);
346         // This flag is set when we want to skip the result returned.  We do
347         // this when we reset scanner because it split under us.
348         boolean retryAfterOutOfOrderException  = true;
349         do {
350           try {
351             // Server returns a null values if scanning is to stop.  Else,
352             // returns an empty array if scanning is to go on and we've just
353             // exhausted current region.
354             values = call(callable, caller, scannerTimeout);
355             // When the replica switch happens, we need to do certain operations
356             // again. The callable will openScanner with the right startkey
357             // but we need to pick up from there. Bypass the rest of the loop
358             // and let the catch-up happen in the beginning of the loop as it
359             // happens for the cases where we see exceptions. Since only openScanner
360             // would have happened, values would be null
361             if (values == null && callable.switchedToADifferentReplica()) {
362               this.currentRegion = callable.getHRegionInfo();
363               continue;
364             }
365             retryAfterOutOfOrderException  = true;
366           } catch (DoNotRetryIOException e) {
367             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
368             // to reset the scanner and come back in again.
369             if (e instanceof UnknownScannerException) {
370               long timeout = lastNext + scannerTimeout;
371               // If we are over the timeout, throw this exception to the client wrapped in
372               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
373               // id against the new region server; reset the scanner.
374               if (timeout < System.currentTimeMillis()) {
375                 long elapsed = System.currentTimeMillis() - lastNext;
376                 ScannerTimeoutException ex = new ScannerTimeoutException(
377                     elapsed + "ms passed since the last invocation, " +
378                         "timeout is currently set to " + scannerTimeout);
379                 ex.initCause(e);
380                 throw ex;
381               }
382             } else {
383               // If exception is any but the list below throw it back to the client; else setup
384               // the scanner and retry.
385               Throwable cause = e.getCause();
386               if ((cause != null && cause instanceof NotServingRegionException) ||
387                 (cause != null && cause instanceof RegionServerStoppedException) ||
388                 e instanceof OutOfOrderScannerNextException) {
389                 // Pass
390                 // It is easier writing the if loop test as list of what is allowed rather than
391                 // as a list of what is not allowed... so if in here, it means we do not throw.
392               } else {
393                 throw e;
394               }
395             }
396             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
397             if (this.lastResult != null) {
398               // The region has moved. We need to open a brand new scanner at
399               // the new location.
400               // Reset the startRow to the row we've seen last so that the new
401               // scanner starts at the correct row. Otherwise we may see previously
402               // returned rows again.
403               // (ScannerCallable by now has "relocated" the correct region)
404               if(scan.isReversed()){
405                 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
406               }else {
407                 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
408               }
409             }
410             if (e instanceof OutOfOrderScannerNextException) {
411               if (retryAfterOutOfOrderException) {
412                 retryAfterOutOfOrderException = false;
413               } else {
414                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
415                 throw new DoNotRetryIOException("Failed after retry of " +
416                   "OutOfOrderScannerNextException: was there a rpc timeout?", e);
417               }
418             }
419             // Clear region.
420             this.currentRegion = null;
421             // Set this to zero so we don't try and do an rpc and close on remote server when
422             // the exception we got was UnknownScanner or the Server is going down.
423             callable = null;
424             // This continue will take us to while at end of loop where we will set up new scanner.
425             continue;
426           }
427           long currentTime = System.currentTimeMillis();
428           if (this.scanMetrics != null) {
429             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
430           }
431           lastNext = currentTime;
432           if (values != null && values.length > 0) {
433             for (Result rs : values) {
434               cache.add(rs);
435               // We don't make Iterator here
436               for (Cell cell : rs.rawCells()) {
437                 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
438               }
439               countdown--;
440               this.lastResult = rs;
441             }
442           }
443           // Values == null means server-side filter has determined we must STOP
444         } while (remainingResultSize > 0 && countdown > 0 &&
445             possiblyNextScanner(countdown, values == null));
446       }
447 
448       if (cache.size() > 0) {
449         return cache.poll();
450       }
451 
452       // if we exhausted this scanner before calling close, write out the scan metrics
453       writeScanMetrics();
454       return null;
455     }
456 
457     @Override
458     public void close() {
459       if (!scanMetricsPublished) writeScanMetrics();
460       if (callable != null) {
461         callable.setClose();
462         try {
463           call(callable, caller, scannerTimeout);
464         } catch (UnknownScannerException e) {
465            // We used to catch this error, interpret, and rethrow. However, we
466            // have since decided that it's not nice for a scanner's close to
467            // throw exceptions. Chances are it was just due to lease time out.
468           if (LOG.isDebugEnabled()) {
469             LOG.debug("scanner failed to close", e);
470           }
471         } catch (IOException e) {
472           /* An exception other than UnknownScanner is unexpected. */
473           LOG.warn("scanner failed to close.", e);
474         }
475         callable = null;
476       }
477       closed = true;
478     }
479 }