View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import com.google.protobuf.ServiceException;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
33  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.RequestConverter;
37  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  
43  import java.io.IOException;
44  import java.io.InterruptedIOException;
45  import java.util.concurrent.ExecutorService;
46  
47  /**
48   * <p>
49   * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
50   * scan results, unless the results cross multiple regions or the row count of
51   * results exceed the caching.
52   * </p>
53   * For small scan, it will get better performance than {@link ReversedClientScanner}
54   */
55  @InterfaceAudience.Private
56  public class ClientSmallReversedScanner extends ReversedClientScanner {
57    private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
58    private ScannerCallableWithReplicas smallReversedScanCallable = null;
59    private SmallReversedScannerCallableFactory callableFactory;
60  
61    /**
62     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
63     * {@link Scan} 's start row maybe changed changed.
64     *
65     * @param conf
66     *          The {@link Configuration} to use.
67     * @param scan
68     *          {@link Scan} to use in this scanner
69     * @param tableName
70     *          The table that we wish to rangeGet
71     * @param connection
72     *          Connection identifying the cluster
73     * @param rpcFactory
74     *          Factory used to create the {@link RpcRetryingCaller}
75     * @param controllerFactory
76     *          Factory used to access RPC payloads
77     * @param pool
78     *          Threadpool for RPC threads
79     * @param primaryOperationTimeout
80     *          Call timeout
81     * @throws IOException
82     *           If the remote call fails
83     */
84    public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
85        final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
86        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
87        throws IOException {
88      this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
89          primaryOperationTimeout, new SmallReversedScannerCallableFactory());
90    }
91  
92    /**
93     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
94     * {@link Scan}'s start row may be changed.
95     *
96     * @param conf
97     *          The {@link Configuration} to use.
98     * @param scan
99     *          {@link Scan} to use in this scanner
100    * @param tableName
101    *          The table that we wish to rangeGet
102    * @param connection
103    *          Connection identifying the cluster
104    * @param rpcFactory
105    *          Factory used to create the {@link RpcRetryingCaller}
106    * @param controllerFactory
107    *          Factory used to access RPC payloads
108    * @param pool
109    *          Threadpool for RPC threads
110    * @param primaryOperationTimeout
111    *          Call timeout
112    * @param callableFactory
113    *          Factory used to create the {@link SmallScannerCallable}
114    * @throws IOException
115    *           If the remote call fails
116    */
117   @VisibleForTesting
118   ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
119       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
121       SmallReversedScannerCallableFactory callableFactory) throws IOException {
122     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
123         primaryOperationTimeout);
124     this.callableFactory = callableFactory;
125   }
126 
127   /**
128    * Gets a scanner for following scan. Move to next region or continue from the last result or
129    * start from the start row.
130    *
131    * @param nbRows
132    * @param done
133    *          true if Server-side says we're done scanning.
134    * @param currentRegionDone
135    *          true if scan is over on current region
136    * @return true if has next scanner
137    * @throws IOException
138    */
139   private boolean nextScanner(int nbRows, final boolean done,
140                               boolean currentRegionDone) throws IOException {
141     // Where to start the next getter
142     byte[] localStartKey;
143     int cacheNum = nbRows;
144     boolean regionChanged = true;
145     boolean isFirstRegionToLocate = false;
146     // if we're at end of table, close and return false to stop iterating
147     if (this.currentRegion != null && currentRegionDone) {
148       byte[] startKey = this.currentRegion.getStartKey();
149       if (startKey == null
150           || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
151           || checkScanStopRow(startKey) || done) {
152         close();
153         if (LOG.isDebugEnabled()) {
154           LOG.debug("Finished with small scan at " + this.currentRegion);
155         }
156         return false;
157       }
158       // We take the row just under to get to the previous region.
159       localStartKey = createClosestRowBefore(startKey);
160       if (LOG.isDebugEnabled()) {
161         LOG.debug("Finished with region " + this.currentRegion);
162       }
163     } else if (this.lastResult != null) {
164       regionChanged = false;
165       localStartKey = createClosestRowBefore(lastResult.getRow());
166     } else {
167       localStartKey = this.scan.getStartRow();
168       isFirstRegionToLocate = true;
169     }
170 
171     if (!isFirstRegionToLocate
172         && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
173       // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
174       // otherwise, maybe infinity results with RowKey=0x00 will return.
175       return false;
176     }
177 
178     if (LOG.isTraceEnabled()) {
179       LOG.trace("Advancing internal small scanner to startKey at '"
180           + Bytes.toStringBinary(localStartKey) + "'");
181     }
182 
183     smallReversedScanCallable =
184         callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
185           localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
186           getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
187 
188     if (this.scanMetrics != null && regionChanged) {
189       this.scanMetrics.countOfRegions.incrementAndGet();
190     }
191     return true;
192   }
193 
194   @Override
195   public Result next() throws IOException {
196     // If the scanner is closed and there's nothing left in the cache, next is a
197     // no-op.
198     if (cache.size() == 0 && this.closed) {
199       return null;
200     }
201     if (cache.size() == 0) {
202       loadCache();
203     }
204 
205     if (cache.size() > 0) {
206       return cache.poll();
207     }
208     // if we exhausted this scanner before calling close, write out the scan
209     // metrics
210     writeScanMetrics();
211     return null;
212   }
213 
214   @Override
215   protected void loadCache() throws IOException {
216     Result[] values = null;
217     long remainingResultSize = maxScannerResultSize;
218     int countdown = this.caching;
219     boolean currentRegionDone = false;
220     // Values == null means server-side filter has determined we must STOP
221     while (remainingResultSize > 0 && countdown > 0
222         && nextScanner(countdown, values == null, currentRegionDone)) {
223       // Server returns a null values if scanning is to stop. Else,
224       // returns an empty array if scanning is to go on and we've just
225       // exhausted current region.
226       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
227       // we do a callWithRetries
228       values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
229       this.currentRegion = smallReversedScanCallable.getHRegionInfo();
230       long currentTime = System.currentTimeMillis();
231       if (this.scanMetrics != null) {
232         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
233             - lastNext);
234       }
235       lastNext = currentTime;
236       if (values != null && values.length > 0) {
237         for (int i = 0; i < values.length; i++) {
238           Result rs = values[i];
239           cache.add(rs);
240           // We don't make Iterator here
241           for (Cell cell : rs.rawCells()) {
242             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
243           }
244           countdown--;
245           this.lastResult = rs;
246         }
247       }
248       if (smallReversedScanCallable.hasMoreResultsContext()) {
249         currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
250       } else {
251         currentRegionDone = countdown > 0;
252       }
253     }
254   }
255 
256   @Override
257   protected void initializeScannerInConstruction() throws IOException {
258     // No need to initialize the scanner when constructing instance, do it when
259     // calling next(). Do nothing here.
260   }
261 
262   @Override
263   public void close() {
264     if (!scanMetricsPublished) writeScanMetrics();
265     closed = true;
266   }
267 
268   @VisibleForTesting
269   protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
270     this.callableFactory = callableFactory;
271   }
272 
273   /**
274    * A reversed ScannerCallable which supports backward small scanning.
275    */
276   static class SmallReversedScannerCallable extends ReversedScannerCallable {
277 
278     public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
279         ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
280         int caching, int replicaId) {
281       super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
282       this.setCaching(caching);
283     }
284 
285     @Override
286     public Result[] call(int timeout) throws IOException {
287       if (this.closed) return null;
288       if (Thread.interrupted()) {
289         throw new InterruptedIOException();
290       }
291       ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
292         getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
293       ClientProtos.ScanResponse response = null;
294       controller = controllerFactory.newController();
295       try {
296         controller.setPriority(getTableName());
297         controller.setCallTimeout(timeout);
298         response = getStub().scan(controller, request);
299         Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
300         if (response.hasMoreResultsInRegion()) {
301           setHasMoreResultsContext(true);
302           setServerHasMoreResults(response.getMoreResultsInRegion());
303         } else {
304           setHasMoreResultsContext(false);
305         }
306         // We need to update result metrics since we are overriding call()
307         updateResultsMetrics(results);
308         return results;
309       } catch (ServiceException se) {
310         throw ProtobufUtil.getRemoteException(se);
311       }
312     }
313 
314     @Override
315     public ScannerCallable getScannerCallableForReplica(int id) {
316       return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
317           scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
318     }
319   }
320 
321   protected static class SmallReversedScannerCallableFactory {
322 
323     public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
324         Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
325         RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
326         int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
327         boolean isFirstRegionToLocate) {
328       byte[] locateStartRow = null;
329       if (isFirstRegionToLocate
330           && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
331         // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
332         // locate a region list, and the last one in region list is the region where our scan start.
333         locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
334       }
335 
336       scan.setStartRow(localStartKey);
337       SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
338           scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
339       ScannerCallableWithReplicas scannerCallableWithReplicas =
340           new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
341               retries, scannerTimeout, cacheNum, conf, caller);
342       return scannerCallableWithReplicas;
343     }
344   }
345 }