View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.util.LinkedList;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.DoNotRetryIOException;
30  import org.apache.hadoop.hbase.HBaseConfiguration;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.UnknownScannerException;
37  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
38  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
41  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
42  import org.apache.hadoop.hbase.util.Bytes;
43  
44  /**
45   * Implements the scanner interface for the HBase client.
46   * If there are multiple regions in a table, this scanner will iterate
47   * through them all.
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Stable
51  public class ClientScanner extends AbstractClientScanner {
52      private final Log LOG = LogFactory.getLog(this.getClass());
53      protected Scan scan;
54      protected boolean closed = false;
55      // Current region scanner is against.  Gets cleared if current region goes
56      // wonky: e.g. if it splits on us.
57      protected HRegionInfo currentRegion = null;
58      protected ScannerCallable callable = null;
59      protected final LinkedList<Result> cache = new LinkedList<Result>();
60      protected final int caching;
61      protected long lastNext;
62      // Keep lastResult returned successfully in case we have to reset scanner.
63      protected Result lastResult = null;
64      protected final long maxScannerResultSize;
65      private final HConnection connection;
66      private final TableName tableName;
67      protected final int scannerTimeout;
68      protected boolean scanMetricsPublished = false;
69      protected RpcRetryingCaller<Result []> caller;
70      protected RpcControllerFactory rpcControllerFactory;
71  
72      /**
73       * Create a new ClientScanner for the specified table. An HConnection will be
74       * retrieved using the passed Configuration.
75       * Note that the passed {@link Scan}'s start row maybe changed changed.
76       *
77       * @param conf The {@link Configuration} to use.
78       * @param scan {@link Scan} to use in this scanner
79       * @param tableName The table that we wish to scan
80       * @throws IOException
81       */
82      public ClientScanner(final Configuration conf, final Scan scan,
83          final TableName tableName) throws IOException {
84        this(conf, scan, tableName, HConnectionManager.getConnection(conf));
85      }
86  
87      /**
88       * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
89       */
90      @Deprecated
91      public ClientScanner(final Configuration conf, final Scan scan,
92          final byte [] tableName) throws IOException {
93        this(conf, scan, TableName.valueOf(tableName));
94      }
95  
96  
97      /**
98     * @deprecated use
99     *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
100    */
101   @Deprecated
102   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
103       HConnection connection) throws IOException {
104     this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf),
105         RpcControllerFactory.instantiate(conf));
106   }
107 
108   /**
109    * @deprecated Use
110    *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection, RpcRetryingCallerFactory, RpcControllerFactory)}
111    */
112   @Deprecated
113   public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
114       HConnection connection) throws IOException {
115     this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
116         RpcControllerFactory.instantiate(conf));
117   }
118   
119   /**
120    * @deprecated Use
121    *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
122    *             RpcRetryingCallerFactory, RpcControllerFactory)} instead.
123    */
124   @Deprecated
125   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
126       HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
127     this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
128   }
129 
130   /**
131    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
132    * row maybe changed changed.
133    * @param conf The {@link Configuration} to use.
134    * @param scan {@link Scan} to use in this scanner
135    * @param tableName The table that we wish to scan
136    * @param connection Connection identifying the cluster
137    * @throws IOException
138    */
139   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
140       HConnection connection, RpcRetryingCallerFactory rpcFactory,
141       RpcControllerFactory controllerFactory) throws IOException {
142       if (LOG.isTraceEnabled()) {
143         LOG.trace("Scan table=" + tableName
144             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
145       }
146       this.scan = scan;
147       this.tableName = tableName;
148       this.lastNext = System.currentTimeMillis();
149       this.connection = connection;
150       if (scan.getMaxResultSize() > 0) {
151         this.maxScannerResultSize = scan.getMaxResultSize();
152       } else {
153         this.maxScannerResultSize = conf.getLong(
154           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
155           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
156       }
157       this.scannerTimeout = HBaseConfiguration.getInt(conf,
158         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
159         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
160         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
161 
162       // check if application wants to collect scan metrics
163       initScanMetrics(scan);
164 
165       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
166       if (this.scan.getCaching() > 0) {
167         this.caching = this.scan.getCaching();
168       } else {
169         this.caching = conf.getInt(
170             HConstants.HBASE_CLIENT_SCANNER_CACHING,
171             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
172       }
173 
174       this.caller = rpcFactory.<Result[]> newCaller();
175       this.rpcControllerFactory = controllerFactory;
176 
177       initializeScannerInConstruction();
178     }
179 
180     protected void initializeScannerInConstruction() throws IOException{
181       // initialize the scanner
182       nextScanner(this.caching, false);
183     }
184 
185     protected HConnection getConnection() {
186       return this.connection;
187     }
188 
189     /**
190      * @return Table name
191      * @deprecated Since 0.96.0; use {@link #getTable()}
192      */
193     @Deprecated
194     protected byte [] getTableName() {
195       return this.tableName.getName();
196     }
197 
198     protected TableName getTable() {
199       return this.tableName;
200     }
201 
202     protected Scan getScan() {
203       return scan;
204     }
205 
206     protected long getTimestamp() {
207       return lastNext;
208     }
209 
210     // returns true if the passed region endKey
211     protected boolean checkScanStopRow(final byte [] endKey) {
212       if (this.scan.getStopRow().length > 0) {
213         // there is a stop row, check to see if we are past it.
214         byte [] stopRow = scan.getStopRow();
215         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
216           endKey, 0, endKey.length);
217         if (cmp <= 0) {
218           // stopRow <= endKey (endKey is equals to or larger than stopRow)
219           // This is a stop.
220           return true;
221         }
222       }
223       return false; //unlikely.
224     }
225 
226     /*
227      * Gets a scanner for the next region.  If this.currentRegion != null, then
228      * we will move to the endrow of this.currentRegion.  Else we will get
229      * scanner at the scan.getStartRow().  We will go no further, just tidy
230      * up outstanding scanners, if <code>currentRegion != null</code> and
231      * <code>done</code> is true.
232      * @param nbRows
233      * @param done Server-side says we're done scanning.
234      */
235   protected boolean nextScanner(int nbRows, final boolean done)
236     throws IOException {
237       // Close the previous scanner if it's open
238       if (this.callable != null) {
239         this.callable.setClose();
240         this.caller.callWithRetries(callable, scannerTimeout);
241         this.callable = null;
242       }
243 
244       // Where to start the next scanner
245       byte [] localStartKey;
246 
247       // if we're at end of table, close and return false to stop iterating
248       if (this.currentRegion != null) {
249         byte [] endKey = this.currentRegion.getEndKey();
250         if (endKey == null ||
251             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
252             checkScanStopRow(endKey) ||
253             done) {
254           close();
255           if (LOG.isTraceEnabled()) {
256             LOG.trace("Finished " + this.currentRegion);
257           }
258           return false;
259         }
260         localStartKey = endKey;
261         if (LOG.isTraceEnabled()) {
262           LOG.trace("Finished " + this.currentRegion);
263         }
264       } else {
265         localStartKey = this.scan.getStartRow();
266       }
267 
268       if (LOG.isDebugEnabled() && this.currentRegion != null) {
269         // Only worth logging if NOT first region in scan.
270         LOG.debug("Advancing internal scanner to startKey at '" +
271           Bytes.toStringBinary(localStartKey) + "'");
272       }
273       try {
274         callable = getScannerCallable(localStartKey, nbRows);
275         // Open a scanner on the region server starting at the
276         // beginning of the region
277         this.caller.callWithRetries(callable, scannerTimeout);
278         this.currentRegion = callable.getHRegionInfo();
279         if (this.scanMetrics != null) {
280           this.scanMetrics.countOfRegions.incrementAndGet();
281         }
282       } catch (IOException e) {
283         close();
284         throw e;
285       }
286       return true;
287     }
288 
289     @InterfaceAudience.Private
290     protected ScannerCallable getScannerCallable(byte [] localStartKey,
291         int nbRows) {
292       scan.setStartRow(localStartKey);
293       ScannerCallable s =
294           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
295               this.rpcControllerFactory);
296       s.setCaching(nbRows);
297       return s;
298     }
299 
300     /**
301      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
302      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
303      * framework because it doesn't support multi-instances of the same metrics on the same machine;
304      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
305      *
306      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
307      * can be turned on by calling calling:
308      *
309      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
310      */
311     protected void writeScanMetrics() {
312       if (this.scanMetrics == null || scanMetricsPublished) {
313         return;
314       }
315       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
316       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
317       scanMetricsPublished = true;
318     }
319 
320     @Override
321     public Result next() throws IOException {
322       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
323       if (cache.size() == 0 && this.closed) {
324         return null;
325       }
326       if (cache.size() == 0) {
327         Result [] values = null;
328         long remainingResultSize = maxScannerResultSize;
329         int countdown = this.caching;
330         // We need to reset it if it's a new callable that was created
331         // with a countdown in nextScanner
332         callable.setCaching(this.caching);
333         // This flag is set when we want to skip the result returned.  We do
334         // this when we reset scanner because it split under us.
335         boolean skipFirst = false;
336         boolean retryAfterOutOfOrderException  = true;
337         do {
338           try {
339             if (skipFirst) {
340               // Skip only the first row (which was the last row of the last
341               // already-processed batch).
342               callable.setCaching(1);
343               values = this.caller.callWithRetries(callable, scannerTimeout);
344               callable.setCaching(this.caching);
345               skipFirst = false;
346             }
347             // Server returns a null values if scanning is to stop.  Else,
348             // returns an empty array if scanning is to go on and we've just
349             // exhausted current region.
350             values = this.caller.callWithRetries(callable, scannerTimeout);
351             if (skipFirst && values != null && values.length == 1) {
352               skipFirst = false; // Already skipped, unset it before scanning again
353               values = this.caller.callWithRetries(callable, scannerTimeout);
354             }
355             retryAfterOutOfOrderException  = true;
356           } catch (DoNotRetryIOException e) {
357             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
358             // to reset the scanner and come back in again.
359             if (e instanceof UnknownScannerException) {
360               long timeout = lastNext + scannerTimeout;
361               // If we are over the timeout, throw this exception to the client wrapped in
362               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
363               // id against the new region server; reset the scanner.
364               if (timeout < System.currentTimeMillis()) {
365                 long elapsed = System.currentTimeMillis() - lastNext;
366                 ScannerTimeoutException ex = new ScannerTimeoutException(
367                     elapsed + "ms passed since the last invocation, " +
368                         "timeout is currently set to " + scannerTimeout);
369                 ex.initCause(e);
370                 throw ex;
371               }
372             } else {
373               // If exception is any but the list below throw it back to the client; else setup
374               // the scanner and retry.
375               Throwable cause = e.getCause();
376               if ((cause != null && cause instanceof NotServingRegionException) ||
377                 (cause != null && cause instanceof RegionServerStoppedException) ||
378                 e instanceof OutOfOrderScannerNextException) {
379                 // Pass
380                 // It is easier writing the if loop test as list of what is allowed rather than
381                 // as a list of what is not allowed... so if in here, it means we do not throw.
382               } else {
383                 throw e;
384               }
385             }
386             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
387             if (this.lastResult != null) {
388               this.scan.setStartRow(this.lastResult.getRow());
389               // Skip first row returned.  We already let it out on previous
390               // invocation.
391               skipFirst = true;
392             }
393             if (e instanceof OutOfOrderScannerNextException) {
394               if (retryAfterOutOfOrderException) {
395                 retryAfterOutOfOrderException = false;
396               } else {
397                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
398                 throw new DoNotRetryIOException("Failed after retry of " +
399                   "OutOfOrderScannerNextException: was there a rpc timeout?", e);
400               }
401             }
402             // Clear region.
403             this.currentRegion = null;
404             // Set this to zero so we don't try and do an rpc and close on remote server when
405             // the exception we got was UnknownScanner or the Server is going down.
406             callable = null;
407             // This continue will take us to while at end of loop where we will set up new scanner.
408             continue;
409           }
410           long currentTime = System.currentTimeMillis();
411           if (this.scanMetrics != null ) {
412             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
413           }
414           lastNext = currentTime;
415           if (values != null && values.length > 0) {
416             for (Result rs : values) {
417               cache.add(rs);
418               for (Cell kv : rs.rawCells()) {
419                 // TODO make method in Cell or CellUtil
420                 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
421               }
422               countdown--;
423               this.lastResult = rs;
424             }
425           }
426           // Values == null means server-side filter has determined we must STOP
427         } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
428       }
429 
430       if (cache.size() > 0) {
431         return cache.poll();
432       }
433 
434       // if we exhausted this scanner before calling close, write out the scan metrics
435       writeScanMetrics();
436       return null;
437     }
438 
439     @Override
440     public void close() {
441       if (!scanMetricsPublished) writeScanMetrics();
442       if (callable != null) {
443         callable.setClose();
444         try {
445           this.caller.callWithRetries(callable, scannerTimeout);
446         } catch (UnknownScannerException e) {
447            // We used to catch this error, interpret, and rethrow. However, we
448            // have since decided that it's not nice for a scanner's close to
449            // throw exceptions. Chances are it was just due to lease time out.
450         } catch (IOException e) {
451            /* An exception other than UnknownScanner is unexpected. */
452            LOG.warn("scanner failed to close. Exception follows: " + e);
453         }
454         callable = null;
455       }
456       closed = true;
457     }
458 }