View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
32  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
33  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34  import org.apache.hadoop.hbase.util.Bytes;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  
38  import java.io.IOException;
39  import java.util.concurrent.ExecutorService;
40  
41  /**
42   * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
43   * scan results, unless the results cross multiple regions or the row count of
44   * results exceed the caching.
45   * <p/>
46   * For small scan, it will get better performance than {@link ReversedClientScanner}
47   */
48  @InterfaceAudience.Private
49  public class ClientSmallReversedScanner extends ReversedClientScanner {
50    private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
51    private ScannerCallableWithReplicas smallScanCallable = null;
52    private SmallScannerCallableFactory callableFactory;
53  
54    /**
55     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
56     * {@link Scan} 's start row maybe changed changed.
57     *
58     * @param conf
59     *          The {@link Configuration} to use.
60     * @param scan
61     *          {@link Scan} to use in this scanner
62     * @param tableName
63     *          The table that we wish to rangeGet
64     * @param connection
65     *          Connection identifying the cluster
66     * @param rpcFactory
67     *          Factory used to create the {@link RpcRetryingCaller}
68     * @param controllerFactory
69     *          Factory used to access RPC payloads
70     * @param pool
71     *          Threadpool for RPC threads
72     * @param primaryOperationTimeout
73     *          Call timeout
74     * @throws IOException
75     *           If the remote call fails
76     */
77    public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
78        final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
79        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
80        throws IOException {
81      this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
82          primaryOperationTimeout, new SmallScannerCallableFactory());
83    }
84  
85    /**
86     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
87     * {@link Scan}'s start row may be changed.
88     *
89     * @param conf
90     *          The {@link Configuration} to use.
91     * @param scan
92     *          {@link Scan} to use in this scanner
93     * @param tableName
94     *          The table that we wish to rangeGet
95     * @param connection
96     *          Connection identifying the cluster
97     * @param rpcFactory
98     *          Factory used to create the {@link RpcRetryingCaller}
99     * @param controllerFactory
100    *          Factory used to access RPC payloads
101    * @param pool
102    *          Threadpool for RPC threads
103    * @param primaryOperationTimeout
104    *          Call timeout
105    * @param callableFactory
106    *          Factory used to create the {@link SmallScannerCallable}
107    * @throws IOException
108    *           If the remote call fails
109    */
110   @VisibleForTesting
111   ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
112       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
113       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
114       SmallScannerCallableFactory callableFactory) throws IOException {
115     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
116         primaryOperationTimeout);
117     this.callableFactory = callableFactory;
118   }
119 
120   /**
121    * Gets a scanner for following scan. Move to next region or continue from the last result or
122    * start from the start row.
123    *
124    * @param nbRows
125    * @param done
126    *          true if Server-side says we're done scanning.
127    * @param currentRegionDone
128    *          true if scan is over on current region
129    * @return true if has next scanner
130    * @throws IOException
131    */
132   private boolean nextScanner(int nbRows, final boolean done,
133                               boolean currentRegionDone) throws IOException {
134     // Where to start the next getter
135     byte[] localStartKey;
136     int cacheNum = nbRows;
137     boolean regionChanged = true;
138     // if we're at end of table, close and return false to stop iterating
139     if (this.currentRegion != null && currentRegionDone) {
140       byte[] startKey = this.currentRegion.getStartKey();
141       if (startKey == null
142           || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
143           || checkScanStopRow(startKey) || done) {
144         close();
145         if (LOG.isDebugEnabled()) {
146           LOG.debug("Finished with small scan at " + this.currentRegion);
147         }
148         return false;
149       }
150       // We take the row just under to get to the previous region.
151       localStartKey = createClosestRowBefore(startKey);
152       if (LOG.isDebugEnabled()) {
153         LOG.debug("Finished with region " + this.currentRegion);
154       }
155     } else if (this.lastResult != null) {
156       regionChanged = false;
157       localStartKey = createClosestRowBefore(lastResult.getRow());
158     } else {
159       localStartKey = this.scan.getStartRow();
160     }
161 
162     if (LOG.isTraceEnabled()) {
163       LOG.trace("Advancing internal small scanner to startKey at '"
164           + Bytes.toStringBinary(localStartKey) + "'");
165     }
166 
167     smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
168         getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
169         getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
170 
171     if (this.scanMetrics != null && regionChanged) {
172       this.scanMetrics.countOfRegions.incrementAndGet();
173     }
174     return true;
175   }
176 
177   @Override
178   public Result next() throws IOException {
179     // If the scanner is closed and there's nothing left in the cache, next is a
180     // no-op.
181     if (cache.size() == 0 && this.closed) {
182       return null;
183     }
184     if (cache.size() == 0) {
185       loadCache();
186     }
187 
188     if (cache.size() > 0) {
189       return cache.poll();
190     }
191     // if we exhausted this scanner before calling close, write out the scan
192     // metrics
193     writeScanMetrics();
194     return null;
195   }
196 
197   @Override
198   protected void loadCache() throws IOException {
199     Result[] values = null;
200     long remainingResultSize = maxScannerResultSize;
201     int countdown = this.caching;
202     boolean currentRegionDone = false;
203     // Values == null means server-side filter has determined we must STOP
204     while (remainingResultSize > 0 && countdown > 0
205         && nextScanner(countdown, values == null, currentRegionDone)) {
206       // Server returns a null values if scanning is to stop. Else,
207       // returns an empty array if scanning is to go on and we've just
208       // exhausted current region.
209       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
210       // we do a callWithRetries
211       values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
212       this.currentRegion = smallScanCallable.getHRegionInfo();
213       long currentTime = System.currentTimeMillis();
214       if (this.scanMetrics != null) {
215         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
216             - lastNext);
217       }
218       lastNext = currentTime;
219       if (values != null && values.length > 0) {
220         for (int i = 0; i < values.length; i++) {
221           Result rs = values[i];
222           cache.add(rs);
223           // We don't make Iterator here
224           for (Cell cell : rs.rawCells()) {
225             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
226           }
227           countdown--;
228           this.lastResult = rs;
229         }
230       }
231       if (smallScanCallable.hasMoreResultsContext()) {
232         currentRegionDone = !smallScanCallable.getServerHasMoreResults();
233       } else {
234         currentRegionDone = countdown > 0;
235       }
236     }
237   }
238 
239   @Override
240   protected void initializeScannerInConstruction() throws IOException {
241     // No need to initialize the scanner when constructing instance, do it when
242     // calling next(). Do nothing here.
243   }
244 
245   @Override
246   public void close() {
247     if (!scanMetricsPublished) writeScanMetrics();
248     closed = true;
249   }
250 
251   @VisibleForTesting
252   protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
253     this.callableFactory = callableFactory;
254   }
255 }