View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.util.LinkedList;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.DoNotRetryIOException;
30  import org.apache.hadoop.hbase.HBaseConfiguration;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.NotServingRegionException;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.UnknownScannerException;
37  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
38  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
40  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
41  import org.apache.hadoop.hbase.util.Bytes;
42  
43  /**
44   * Implements the scanner interface for the HBase client.
45   * If there are multiple regions in a table, this scanner will iterate
46   * through them all.
47   */
48  @InterfaceAudience.Public
49  @InterfaceStability.Stable
50  public class ClientScanner extends AbstractClientScanner {
51      private final Log LOG = LogFactory.getLog(this.getClass());
52      protected Scan scan;
53      protected boolean closed = false;
54      // Current region scanner is against.  Gets cleared if current region goes
55      // wonky: e.g. if it splits on us.
56      protected HRegionInfo currentRegion = null;
57      protected ScannerCallable callable = null;
58      protected final LinkedList<Result> cache = new LinkedList<Result>();
59      protected final int caching;
60      protected long lastNext;
61      // Keep lastResult returned successfully in case we have to reset scanner.
62      protected Result lastResult = null;
63      protected final long maxScannerResultSize;
64      private final HConnection connection;
65      private final TableName tableName;
66      protected final int scannerTimeout;
67      protected boolean scanMetricsPublished = false;
68      protected RpcRetryingCaller<Result []> caller;
69  
70      /**
71       * Create a new ClientScanner for the specified table. An HConnection will be
72       * retrieved using the passed Configuration.
73       * Note that the passed {@link Scan}'s start row maybe changed changed.
74       *
75       * @param conf The {@link Configuration} to use.
76       * @param scan {@link Scan} to use in this scanner
77       * @param tableName The table that we wish to scan
78       * @throws IOException
79       */
80      public ClientScanner(final Configuration conf, final Scan scan,
81          final TableName tableName) throws IOException {
82        this(conf, scan, tableName, HConnectionManager.getConnection(conf));
83      }
84  
85      /**
86       * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
87       */
88      @Deprecated
89      public ClientScanner(final Configuration conf, final Scan scan,
90          final byte [] tableName) throws IOException {
91        this(conf, scan, TableName.valueOf(tableName));
92      }
93  
94  
95      /**
96       * Create a new ClientScanner for the specified table
97       * Note that the passed {@link Scan}'s start row maybe changed changed.
98       *
99       * @param conf The {@link Configuration} to use.
100      * @param scan {@link Scan} to use in this scanner
101      * @param tableName The table that we wish to scan
102      * @param connection Connection identifying the cluster
103      * @throws IOException
104      */
105   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
106       HConnection connection) throws IOException {
107     this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
108   }
109 
110   /**
111    * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
112    */
113   @Deprecated
114   public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
115       HConnection connection) throws IOException {
116     this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf));
117   }
118 
119   /**
120    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
121    * row maybe changed changed.
122    * @param conf The {@link Configuration} to use.
123    * @param scan {@link Scan} to use in this scanner
124    * @param tableName The table that we wish to scan
125    * @param connection Connection identifying the cluster
126    * @throws IOException
127    */
128   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
129       HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
130       if (LOG.isTraceEnabled()) {
131         LOG.trace("Scan table=" + tableName
132             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
133       }
134       this.scan = scan;
135       this.tableName = tableName;
136       this.lastNext = System.currentTimeMillis();
137       this.connection = connection;
138       if (scan.getMaxResultSize() > 0) {
139         this.maxScannerResultSize = scan.getMaxResultSize();
140       } else {
141         this.maxScannerResultSize = conf.getLong(
142           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
143           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
144       }
145       this.scannerTimeout = HBaseConfiguration.getInt(conf,
146         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
147         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
148         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
149 
150       // check if application wants to collect scan metrics
151       initScanMetrics(scan);
152 
153       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
154       if (this.scan.getCaching() > 0) {
155         this.caching = this.scan.getCaching();
156       } else {
157         this.caching = conf.getInt(
158             HConstants.HBASE_CLIENT_SCANNER_CACHING,
159             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
160       }
161 
162     this.caller = rpcFactory.<Result[]> newCaller();
163 
164       initializeScannerInConstruction();
165     }
166 
167     protected void initializeScannerInConstruction() throws IOException{
168       // initialize the scanner
169       nextScanner(this.caching, false);
170     }
171 
172     protected HConnection getConnection() {
173       return this.connection;
174     }
175 
176     /**
177      * @return Table name
178      * @deprecated Since 0.96.0; use {@link #getTable()}
179      */
180     @Deprecated
181     protected byte [] getTableName() {
182       return this.tableName.getName();
183     }
184 
185     protected TableName getTable() {
186       return this.tableName;
187     }
188 
189     protected Scan getScan() {
190       return scan;
191     }
192 
193     protected long getTimestamp() {
194       return lastNext;
195     }
196 
197     // returns true if the passed region endKey
198     protected boolean checkScanStopRow(final byte [] endKey) {
199       if (this.scan.getStopRow().length > 0) {
200         // there is a stop row, check to see if we are past it.
201         byte [] stopRow = scan.getStopRow();
202         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
203           endKey, 0, endKey.length);
204         if (cmp <= 0) {
205           // stopRow <= endKey (endKey is equals to or larger than stopRow)
206           // This is a stop.
207           return true;
208         }
209       }
210       return false; //unlikely.
211     }
212 
213     /*
214      * Gets a scanner for the next region.  If this.currentRegion != null, then
215      * we will move to the endrow of this.currentRegion.  Else we will get
216      * scanner at the scan.getStartRow().  We will go no further, just tidy
217      * up outstanding scanners, if <code>currentRegion != null</code> and
218      * <code>done</code> is true.
219      * @param nbRows
220      * @param done Server-side says we're done scanning.
221      */
222   protected boolean nextScanner(int nbRows, final boolean done)
223     throws IOException {
224       // Close the previous scanner if it's open
225       if (this.callable != null) {
226         this.callable.setClose();
227         this.caller.callWithRetries(callable, scannerTimeout);
228         this.callable = null;
229       }
230 
231       // Where to start the next scanner
232       byte [] localStartKey;
233 
234       // if we're at end of table, close and return false to stop iterating
235       if (this.currentRegion != null) {
236         byte [] endKey = this.currentRegion.getEndKey();
237         if (endKey == null ||
238             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
239             checkScanStopRow(endKey) ||
240             done) {
241           close();
242           if (LOG.isTraceEnabled()) {
243             LOG.trace("Finished " + this.currentRegion);
244           }
245           return false;
246         }
247         localStartKey = endKey;
248         if (LOG.isTraceEnabled()) {
249           LOG.trace("Finished " + this.currentRegion);
250         }
251       } else {
252         localStartKey = this.scan.getStartRow();
253       }
254 
255       if (LOG.isDebugEnabled() && this.currentRegion != null) {
256         // Only worth logging if NOT first region in scan.
257         LOG.debug("Advancing internal scanner to startKey at '" +
258           Bytes.toStringBinary(localStartKey) + "'");
259       }
260       try {
261         callable = getScannerCallable(localStartKey, nbRows);
262         // Open a scanner on the region server starting at the
263         // beginning of the region
264         this.caller.callWithRetries(callable, scannerTimeout);
265         this.currentRegion = callable.getHRegionInfo();
266         if (this.scanMetrics != null) {
267           this.scanMetrics.countOfRegions.incrementAndGet();
268         }
269       } catch (IOException e) {
270         close();
271         throw e;
272       }
273       return true;
274     }
275 
276     @InterfaceAudience.Private
277     protected ScannerCallable getScannerCallable(byte [] localStartKey,
278         int nbRows) {
279       scan.setStartRow(localStartKey);
280       ScannerCallable s = new ScannerCallable(getConnection(),
281         getTable(), scan, this.scanMetrics);
282       s.setCaching(nbRows);
283       return s;
284     }
285 
286     /**
287      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
288      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
289      * framework because it doesn't support multi-instances of the same metrics on the same machine;
290      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
291      *
292      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
293      * can be turned on by calling calling:
294      *
295      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
296      */
297     protected void writeScanMetrics() {
298       if (this.scanMetrics == null || scanMetricsPublished) {
299         return;
300       }
301       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
302       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
303       scanMetricsPublished = true;
304     }
305 
306     @Override
307     public Result next() throws IOException {
308       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
309       if (cache.size() == 0 && this.closed) {
310         return null;
311       }
312       if (cache.size() == 0) {
313         Result [] values = null;
314         long remainingResultSize = maxScannerResultSize;
315         int countdown = this.caching;
316         // We need to reset it if it's a new callable that was created
317         // with a countdown in nextScanner
318         callable.setCaching(this.caching);
319         // This flag is set when we want to skip the result returned.  We do
320         // this when we reset scanner because it split under us.
321         boolean skipFirst = false;
322         boolean retryAfterOutOfOrderException  = true;
323         do {
324           try {
325             if (skipFirst) {
326               // Skip only the first row (which was the last row of the last
327               // already-processed batch).
328               callable.setCaching(1);
329               values = this.caller.callWithRetries(callable, scannerTimeout);
330               callable.setCaching(this.caching);
331               skipFirst = false;
332             }
333             // Server returns a null values if scanning is to stop.  Else,
334             // returns an empty array if scanning is to go on and we've just
335             // exhausted current region.
336             values = this.caller.callWithRetries(callable, scannerTimeout);
337             if (skipFirst && values != null && values.length == 1) {
338               skipFirst = false; // Already skipped, unset it before scanning again
339               values = this.caller.callWithRetries(callable, scannerTimeout);
340             }
341             retryAfterOutOfOrderException  = true;
342           } catch (DoNotRetryIOException e) {
343             // DNRIOEs are thrown to make us break out of retries.  Some types of DNRIOEs want us
344             // to reset the scanner and come back in again.
345             if (e instanceof UnknownScannerException) {
346               long timeout = lastNext + scannerTimeout;
347               // If we are over the timeout, throw this exception to the client wrapped in
348               // a ScannerTimeoutException. Else, it's because the region moved and we used the old
349               // id against the new region server; reset the scanner.
350               if (timeout < System.currentTimeMillis()) {
351                 long elapsed = System.currentTimeMillis() - lastNext;
352                 ScannerTimeoutException ex = new ScannerTimeoutException(
353                     elapsed + "ms passed since the last invocation, " +
354                         "timeout is currently set to " + scannerTimeout);
355                 ex.initCause(e);
356                 throw ex;
357               }
358             } else {
359               // If exception is any but the list below throw it back to the client; else setup
360               // the scanner and retry.
361               Throwable cause = e.getCause();
362               if ((cause != null && cause instanceof NotServingRegionException) ||
363                 (cause != null && cause instanceof RegionServerStoppedException) ||
364                 e instanceof OutOfOrderScannerNextException) {
365                 // Pass
366                 // It is easier writing the if loop test as list of what is allowed rather than
367                 // as a list of what is not allowed... so if in here, it means we do not throw.
368               } else {
369                 throw e;
370               }
371             }
372             // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
373             if (this.lastResult != null) {
374               this.scan.setStartRow(this.lastResult.getRow());
375               // Skip first row returned.  We already let it out on previous
376               // invocation.
377               skipFirst = true;
378             }
379             if (e instanceof OutOfOrderScannerNextException) {
380               if (retryAfterOutOfOrderException) {
381                 retryAfterOutOfOrderException = false;
382               } else {
383                 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
384                 throw new DoNotRetryIOException("Failed after retry of " +
385                   "OutOfOrderScannerNextException: was there a rpc timeout?", e);
386               }
387             }
388             // Clear region.
389             this.currentRegion = null;
390             // Set this to zero so we don't try and do an rpc and close on remote server when
391             // the exception we got was UnknownScanner or the Server is going down.
392             callable = null;
393             // This continue will take us to while at end of loop where we will set up new scanner.
394             continue;
395           }
396           long currentTime = System.currentTimeMillis();
397           if (this.scanMetrics != null ) {
398             this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
399           }
400           lastNext = currentTime;
401           if (values != null && values.length > 0) {
402             for (Result rs : values) {
403               cache.add(rs);
404               for (Cell kv : rs.rawCells()) {
405                 // TODO make method in Cell or CellUtil
406                 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
407               }
408               countdown--;
409               this.lastResult = rs;
410             }
411           }
412           // Values == null means server-side filter has determined we must STOP
413         } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
414       }
415 
416       if (cache.size() > 0) {
417         return cache.poll();
418       }
419 
420       // if we exhausted this scanner before calling close, write out the scan metrics
421       writeScanMetrics();
422       return null;
423     }
424 
425     @Override
426     public void close() {
427       if (!scanMetricsPublished) writeScanMetrics();
428       if (callable != null) {
429         callable.setClose();
430         try {
431           this.caller.callWithRetries(callable, scannerTimeout);
432         } catch (UnknownScannerException e) {
433            // We used to catch this error, interpret, and rethrow. However, we
434            // have since decided that it's not nice for a scanner's close to
435            // throw exceptions. Chances are it was just due to lease time out.
436         } catch (IOException e) {
437            /* An exception other than UnknownScanner is unexpected. */
438            LOG.warn("scanner failed to close. Exception follows: " + e);
439         }
440         callable = null;
441       }
442       closed = true;
443     }
444 }