View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.concurrent.ExecutorService;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.RequestConverter;
37  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
39  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
40  import org.apache.hadoop.hbase.util.Bytes;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.protobuf.ServiceException;
44  
45  /**
46   * Client scanner for small scan. Generally, only one RPC is called to fetch the
47   * scan results, unless the results cross multiple regions or the row count of
48   * results excess the caching.
49   *
50   * For small scan, it will get better performance than {@link ClientScanner}
51   */
52  @InterfaceAudience.Private
53  public class ClientSmallScanner extends ClientScanner {
54    private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class);
55    private ScannerCallableWithReplicas smallScanCallable = null;
56    private SmallScannerCallableFactory callableFactory;
57  
58    /**
59     * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
60     * 's start row maybe changed changed.
61     *
62     * @param conf
63     *          The {@link Configuration} to use.
64     * @param scan
65     *          {@link Scan} to use in this scanner
66     * @param tableName
67     *          The table that we wish to rangeGet
68     * @param connection
69     *          Connection identifying the cluster
70     * @param rpcFactory
71     *          Factory used to create the {@link RpcRetryingCaller}
72     * @param controllerFactory
73     *          Factory used to access RPC payloads
74     * @param pool
75     *          Threadpool for RPC threads
76     * @param primaryOperationTimeout
77     *          Call timeout
78     * @throws IOException
79     *           If the remote call fails
80     */
81    public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
82        ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
83        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
84        throws IOException {
85      this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
86          primaryOperationTimeout, new SmallScannerCallableFactory());
87    }
88  
89    /**
90     * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
91     * 's start row maybe changed changed. Intended for unit tests to provide their own
92     * {@link SmallScannerCallableFactory} implementation/mock.
93     *
94     * @param conf
95     *          The {@link Configuration} to use.
96     * @param scan
97     *          {@link Scan} to use in this scanner
98     * @param tableName
99     *          The table that we wish to rangeGet
100    * @param connection
101    *          Connection identifying the cluster
102    * @param rpcFactory
103    *          Factory used to create the {@link RpcRetryingCaller}
104    * @param controllerFactory
105    *          Factory used to access RPC payloads
106    * @param pool
107    *          Threadpool for RPC threads
108    * @param primaryOperationTimeout
109    *          Call timeout
110    * @param callableFactory
111    *          Factory used to create the {@link SmallScannerCallable}
112    * @throws IOException
113    */
114   @VisibleForTesting
115   ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
116       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
117       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
118       SmallScannerCallableFactory callableFactory) throws IOException {
119     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
120         primaryOperationTimeout);
121     this.callableFactory = callableFactory;
122   }
123 
124   @Override
125   protected void initializeScannerInConstruction() throws IOException {
126     // No need to initialize the scanner when constructing instance, do it when
127     // calling next(). Do nothing here.
128   }
129 
130   /**
131    * Gets a scanner for following scan. Move to next region or continue from the
132    * last result or start from the start row.
133    * @param nbRows
134    * @param done true if Server-side says we're done scanning.
135    * @param currentRegionDone true if scan is over on current region
136    * @return true if has next scanner
137    * @throws IOException
138    */
139   private boolean nextScanner(int nbRows, final boolean done,
140       boolean currentRegionDone) throws IOException {
141     // Where to start the next getter
142     byte[] localStartKey;
143     int cacheNum = nbRows;
144     boolean regionChanged = true;
145     // if we're at end of table, close and return false to stop iterating
146     if (this.currentRegion != null && currentRegionDone) {
147       byte[] endKey = this.currentRegion.getEndKey();
148       if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
149           || checkScanStopRow(endKey) || done) {
150         close();
151         if (LOG.isTraceEnabled()) {
152           LOG.trace("Finished with small scan at " + this.currentRegion);
153         }
154         return false;
155       }
156       localStartKey = endKey;
157       if (LOG.isTraceEnabled()) {
158         LOG.trace("Finished with region " + this.currentRegion);
159       }
160     } else if (this.lastResult != null) {
161       regionChanged = false;
162       localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
163     } else {
164       localStartKey = this.scan.getStartRow();
165     }
166 
167     if (LOG.isTraceEnabled()) {
168       LOG.trace("Advancing internal small scanner to startKey at '"
169           + Bytes.toStringBinary(localStartKey) + "'");
170     }
171     smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
172         getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
173         getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
174     if (this.scanMetrics != null && regionChanged) {
175       this.scanMetrics.countOfRegions.incrementAndGet();
176     }
177     return true;
178   }
179 
180   static class SmallScannerCallable extends ScannerCallable {
181     public SmallScannerCallable(
182         ClusterConnection connection, TableName table, Scan scan,
183         ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) {
184       super(connection, table, scan, scanMetrics, controllerFactory, id);
185       this.setCaching(caching);
186     }
187 
188     @Override
189     public Result[] call(int timeout) throws IOException {
190       if (this.closed) return null;
191       if (Thread.interrupted()) {
192         throw new InterruptedIOException();
193       }
194       ScanRequest request = RequestConverter.buildScanRequest(getLocation()
195           .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
196       ScanResponse response = null;
197       controller = controllerFactory.newController();
198       try {
199         controller.setPriority(getTableName());
200         controller.setCallTimeout(timeout);
201         response = getStub().scan(controller, request);
202         Result[] results = ResponseConverter.getResults(controller.cellScanner(),
203             response);
204         if (response.hasMoreResultsInRegion()) {
205           setHasMoreResultsContext(true);
206           setServerHasMoreResults(response.getMoreResultsInRegion());
207         } else {
208           setHasMoreResultsContext(false);
209         }
210         // We need to update result metrics since we are overriding call()
211         updateResultsMetrics(results);
212         return results;
213       } catch (ServiceException se) {
214         throw ProtobufUtil.getRemoteException(se);
215       }
216     }
217 
218     @Override
219     public ScannerCallable getScannerCallableForReplica(int id) {
220       return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
221           scanMetrics, controllerFactory, getCaching(), id);
222     }
223   }
224 
225   @Override
226   public Result next() throws IOException {
227     // If the scanner is closed and there's nothing left in the cache, next is a
228     // no-op.
229     if (cache.size() == 0 && this.closed) {
230       return null;
231     }
232     if (cache.size() == 0) {
233       loadCache();
234     }
235 
236     if (cache.size() > 0) {
237       return cache.poll();
238     }
239     // if we exhausted this scanner before calling close, write out the scan
240     // metrics
241     writeScanMetrics();
242     return null;
243   }
244 
245   @Override
246   protected void loadCache() throws IOException {
247     Result[] values = null;
248     long remainingResultSize = maxScannerResultSize;
249     int countdown = this.caching;
250     boolean currentRegionDone = false;
251     // Values == null means server-side filter has determined we must STOP
252     while (remainingResultSize > 0 && countdown > 0
253         && nextScanner(countdown, values == null, currentRegionDone)) {
254       // Server returns a null values if scanning is to stop. Else,
255       // returns an empty array if scanning is to go on and we've just
256       // exhausted current region.
257       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
258       // we do a callWithRetries
259       values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
260       this.currentRegion = smallScanCallable.getHRegionInfo();
261       long currentTime = System.currentTimeMillis();
262       if (this.scanMetrics != null) {
263         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
264             - lastNext);
265       }
266       lastNext = currentTime;
267       if (values != null && values.length > 0) {
268         for (int i = 0; i < values.length; i++) {
269           Result rs = values[i];
270           cache.add(rs);
271           // We don't make Iterator here
272           for (Cell cell : rs.rawCells()) {
273             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
274           }
275           countdown--;
276           this.lastResult = rs;
277         }
278       }
279       if (smallScanCallable.hasMoreResultsContext()) {
280         // If the server has more results, the current region is not done
281         currentRegionDone = !smallScanCallable.getServerHasMoreResults();
282       } else {
283         // not guaranteed to get the context in older versions, fall back to checking countdown
284         currentRegionDone = countdown > 0;
285       }
286     }
287   }
288 
289   public void close() {
290     if (!scanMetricsPublished) writeScanMetrics();
291     closed = true;
292   }
293 
294   @VisibleForTesting
295   protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
296     this.callableFactory = callableFactory;
297   }
298 
299   @InterfaceAudience.Private
300   protected static class SmallScannerCallableFactory {
301 
302     public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
303         Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
304         RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
305         int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
306       scan.setStartRow(localStartKey);
307       SmallScannerCallable s = new SmallScannerCallable(
308         connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
309       ScannerCallableWithReplicas scannerCallableWithReplicas =
310           new ScannerCallableWithReplicas(table, connection,
311               s, pool, primaryOperationTimeout, scan, retries,
312               scannerTimeout, cacheNum, conf, caller);
313       return scannerCallableWithReplicas;
314     }
315 
316   }
317 }