View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.LinkedList;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.NotServingRegionException;
32  import org.apache.hadoop.hbase.CallSequenceOutOfOrderException;
33  import org.apache.hadoop.hbase.UnknownScannerException;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.DataOutputBuffer;
38  
39  /**
40   * Implements the scanner interface for the HBase client.
41   * If there are multiple regions in a table, this scanner will iterate
42   * through them all.
43   */
44  public class ClientScanner extends AbstractClientScanner {
45      private final Log LOG = LogFactory.getLog(this.getClass());
46      protected Scan scan;
47      protected boolean closed = false;
48      // Current region scanner is against.  Gets cleared if current region goes
49      // wonky: e.g. if it splits on us.
50      protected HRegionInfo currentRegion = null;
51      private ScannerCallable callable = null;
52      protected final LinkedList<Result> cache = new LinkedList<Result>();
53      protected final int caching;
54      protected long lastNext;
55      // Keep lastResult returned successfully in case we have to reset scanner.
56      protected Result lastResult = null;
57      protected ScanMetrics scanMetrics = null;
58      protected final long maxScannerResultSize;
59      private final HConnection connection;
60      private final byte[] tableName;
61      private final int scannerTimeout;
62      private boolean scanMetricsPublished = false;
63      
64      /**
65       * Create a new ClientScanner for the specified table. An HConnection will be
66       * retrieved using the passed Configuration.
67       * Note that the passed {@link Scan}'s start row maybe changed changed. 
68       * 
69       * @param conf The {@link Configuration} to use.
70       * @param scan {@link Scan} to use in this scanner
71       * @param tableName The table that we wish to scan
72       * @throws IOException
73       */
74      @Deprecated
75      public ClientScanner(final Configuration conf, final Scan scan,
76          final byte[] tableName) throws IOException {
77        this(conf, scan, tableName, HConnectionManager.getConnection(conf));
78      }
79   
80      /**
81       * Create a new ClientScanner for the specified table
82       * Note that the passed {@link Scan}'s start row maybe changed changed. 
83       * 
84       * @param conf The {@link Configuration} to use.
85       * @param scan {@link Scan} to use in this scanner
86       * @param tableName The table that we wish to scan
87       * @param connection Connection identifying the cluster
88       * @throws IOException
89       */
90      public ClientScanner(final Configuration conf, final Scan scan,
91        final byte[] tableName, HConnection connection) throws IOException {
92        if (LOG.isDebugEnabled()) {
93          LOG.debug("Creating scanner over "
94              + Bytes.toString(tableName)
95              + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
96        }
97        this.scan = scan;
98        this.tableName = tableName;
99        this.lastNext = System.currentTimeMillis();
100       this.connection = connection;
101       this.maxScannerResultSize = conf.getLong(
102           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
103           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
104       this.scannerTimeout = (int) conf.getLong(
105           HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
106           HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
107 
108       // check if application wants to collect scan metrics
109       byte[] enableMetrics = scan.getAttribute(
110         Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
111       if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
112         scanMetrics = new ScanMetrics();
113       }
114 
115       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
116       if (this.scan.getCaching() > 0) {
117         this.caching = this.scan.getCaching();
118       } else {
119         this.caching = conf.getInt("hbase.client.scanner.caching", 1);
120       }
121 
122       // initialize the scanner
123       initializeScannerInConstruction();
124     }
125     
126     protected void initializeScannerInConstruction() throws IOException{
127       // initialize the scanner
128       nextScanner(this.caching, false);
129     }
130 
131     protected HConnection getConnection() {
132       return this.connection;
133     }
134 
135     protected byte[] getTableName() {
136       return this.tableName;
137     }
138 
139     protected Scan getScan() {
140       return scan;
141     }
142 
143     protected long getTimestamp() {
144       return lastNext;
145     }
146 
147     // returns true if the passed region endKey
148     protected boolean checkScanStopRow(final byte [] endKey) {
149       if (this.scan.getStopRow().length > 0) {
150         // there is a stop row, check to see if we are past it.
151         byte [] stopRow = scan.getStopRow();
152         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
153           endKey, 0, endKey.length);
154         if (cmp <= 0) {
155           // stopRow <= endKey (endKey is equals to or larger than stopRow)
156           // This is a stop.
157           return true;
158         }
159       }
160       return false; //unlikely.
161     }
162 
163     /*
164      * Gets a scanner for the next region.  If this.currentRegion != null, then
165      * we will move to the endrow of this.currentRegion.  Else we will get
166      * scanner at the scan.getStartRow().  We will go no further, just tidy
167      * up outstanding scanners, if <code>currentRegion != null</code> and
168      * <code>done</code> is true.
169      * @param nbRows
170      * @param done Server-side says we're done scanning.
171      */
172     private boolean nextScanner(int nbRows, final boolean done)
173     throws IOException {
174       // Close the previous scanner if it's open
175       if (this.callable != null) {
176         this.callable.setClose();
177         callable.withRetries();
178         this.callable = null;
179       }
180 
181       // Where to start the next scanner
182       byte [] localStartKey;
183 
184       // if we're at end of table, close and return false to stop iterating
185       if (this.currentRegion != null) {
186         byte [] endKey = this.currentRegion.getEndKey();
187         if (endKey == null ||
188             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
189             checkScanStopRow(endKey) ||
190             done) {
191           close();
192           if (LOG.isDebugEnabled()) {
193             LOG.debug("Finished with scanning at " + this.currentRegion);
194           }
195           return false;
196         }
197         localStartKey = endKey;
198         if (LOG.isDebugEnabled()) {
199           LOG.debug("Finished with region " + this.currentRegion);
200         }
201       } else {
202         localStartKey = this.scan.getStartRow();
203       }
204 
205       if (LOG.isDebugEnabled()) {
206         LOG.debug("Advancing internal scanner to startKey at '" +
207           Bytes.toStringBinary(localStartKey) + "'");
208       }
209       try {
210         callable = getScannerCallable(localStartKey, nbRows);
211         // Open a scanner on the region server starting at the
212         // beginning of the region
213         callable.withRetries();
214         this.currentRegion = callable.getHRegionInfo();
215         if (this.scanMetrics != null) {
216           this.scanMetrics.countOfRegions.inc();
217         }
218       } catch (IOException e) {
219         close();
220         throw e;
221       }
222       return true;
223     }
224 
225     protected ScannerCallable getScannerCallable(byte [] localStartKey,
226         int nbRows) {
227       scan.setStartRow(localStartKey);
228       ScannerCallable s = new ScannerCallable(getConnection(),
229         getTableName(), scan, this.scanMetrics);
230       s.setCaching(nbRows);
231       return s;
232     }
233 
234     /**
235      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
236      * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
237      * framework because it doesn't support multi-instances of the same metrics on the same machine;
238      * for scan/map reduce scenarios, we will have multiple scans running at the same time.
239      *
240      * By default, scan metrics are disabled; if the application wants to collect them, this behavior
241      * can be turned on by calling calling:
242      *
243      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
244      */
245     protected void writeScanMetrics() throws IOException {
246       if (this.scanMetrics == null || scanMetricsPublished) {
247         return;
248       }
249       final DataOutputBuffer d = new DataOutputBuffer();
250       scanMetrics.write(d);
251       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
252       scanMetricsPublished = true;
253     }
254 
255     public Result next() throws IOException {
256       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
257       if (cache.size() == 0 && this.closed) {
258         return null;
259       }
260       if (cache.size() == 0) {
261         Result [] values = null;
262         long remainingResultSize = maxScannerResultSize;
263         int countdown = this.caching;
264         // We need to reset it if it's a new callable that was created
265         // with a countdown in nextScanner
266         callable.setCaching(this.caching);
267         // This flag is set when we want to skip the result returned.  We do
268         // this when we reset scanner because it split under us.
269         boolean skipFirst = false;
270         do {
271           try {
272             if (skipFirst) {
273               // Skip only the first row (which was the last row of the last
274               // already-processed batch).
275               callable.setCaching(1);
276               values = callable.withRetries();
277               callable.setCaching(this.caching);
278               skipFirst = false;
279             }
280             // Server returns a null values if scanning is to stop.  Else,
281             // returns an empty array if scanning is to go on and we've just
282             // exhausted current region.
283             values = callable.withRetries();
284           } catch (DoNotRetryIOException e) {
285             if (e instanceof UnknownScannerException) {
286               long timeout = lastNext + scannerTimeout;
287               // If we are over the timeout, throw this exception to the client
288               // Else, it's because the region moved and we used the old id
289               // against the new region server; reset the scanner.
290               if (timeout < System.currentTimeMillis()) {
291                 long elapsed = System.currentTimeMillis() - lastNext;
292                 ScannerTimeoutException ex = new ScannerTimeoutException(
293                     elapsed + "ms passed since the last invocation, " +
294                         "timeout is currently set to " + scannerTimeout);
295                 ex.initCause(e);
296                 throw ex;
297               }
298             } else {
299               Throwable cause = e.getCause();
300               if ((cause == null || (!(cause instanceof NotServingRegionException)
301                   && !(cause instanceof RegionServerStoppedException)))
302                   && !(e instanceof CallSequenceOutOfOrderException)) {
303                 throw e;
304               }
305             }
306             // Else, its signal from depths of ScannerCallable that we got an
307             // NSRE on a next and that we need to reset the scanner.
308             if (this.lastResult != null) {
309               // The region has moved. We need to open a brand new scanner at
310               // the new location.
311               // Reset the startRow to the row we've seen last so that the new
312               // scanner starts at the correct row. Otherwise we may see previously
313               // returned rows again.
314               // (ScannerCallable by now has "relocated" the correct region)
315               this.scan.setStartRow(this.lastResult.getRow());
316 
317               // Skip first row returned.  We already let it out on previous
318               // invocation.
319               skipFirst = true;
320             }
321             // Clear region
322             this.currentRegion = null;
323             continue;
324           }
325           long currentTime = System.currentTimeMillis();
326           if (this.scanMetrics != null ) {
327             this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
328           }
329           lastNext = currentTime;
330           if (values != null && values.length > 0) {
331             for (Result rs : values) {
332               cache.add(rs);
333               for (KeyValue kv : rs.raw()) {
334                   remainingResultSize -= kv.heapSize();
335               }
336               countdown--;
337               this.lastResult = rs;
338             }
339           }
340           // Values == null means server-side filter has determined we must STOP
341         } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
342       }
343 
344       if (cache.size() > 0) {
345         return cache.poll();
346       }
347 
348       // if we exhausted this scanner before calling close, write out the scan metrics
349       writeScanMetrics();
350       return null;
351     }
352 
353     /**
354      * Get <param>nbRows</param> rows.
355      * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
356      * setting (or hbase.client.scanner.caching in hbase-site.xml).
357      * @param nbRows number of rows to return
358      * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
359      * if returned array is of zero-length (We never return null).
360      * @throws IOException
361      */
362     public Result [] next(int nbRows) throws IOException {
363       // Collect values to be returned here
364       ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
365       for(int i = 0; i < nbRows; i++) {
366         Result next = next();
367         if (next != null) {
368           resultSets.add(next);
369         } else {
370           break;
371         }
372       }
373       return resultSets.toArray(new Result[resultSets.size()]);
374     }
375 
376     public void close() {
377       if (!scanMetricsPublished){
378         try {
379           writeScanMetrics();
380         } catch (IOException e) {
381         }
382       }
383       
384       if (callable != null) {
385         callable.setClose();
386         try {
387           callable.withRetries();
388         } catch (IOException e) {
389           // We used to catch this error, interpret, and rethrow. However, we
390           // have since decided that it's not nice for a scanner's close to
391           // throw exceptions. Chances are it was just an UnknownScanner
392           // exception due to lease time out.
393         }
394         callable = null;
395       }
396       closed = true;
397     }
398 }