View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import java.io.IOException;
24  import java.util.concurrent.ExecutorService;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
35  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
36  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  import com.google.common.annotations.VisibleForTesting;
40  
41  /**
42   * <p>
43   * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
44   * scan results, unless the results cross multiple regions or the row count of
45   * results exceed the caching.
46   * </p>
47   * For small scan, it will get better performance than {@link ReversedClientScanner}
48   */
49  @InterfaceAudience.Private
50  public class ClientSmallReversedScanner extends ReversedClientScanner {
51    private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
52    private ScannerCallableWithReplicas smallScanCallable = null;
53    private SmallScannerCallableFactory callableFactory;
54  
55    /**
56     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
57     * {@link Scan} 's start row maybe changed changed.
58     *
59     * @param conf
60     *          The {@link Configuration} to use.
61     * @param scan
62     *          {@link Scan} to use in this scanner
63     * @param tableName
64     *          The table that we wish to rangeGet
65     * @param connection
66     *          Connection identifying the cluster
67     * @param rpcFactory
68     *          Factory used to create the {@link RpcRetryingCaller}
69     * @param controllerFactory
70     *          Factory used to access RPC payloads
71     * @param pool
72     *          Threadpool for RPC threads
73     * @param primaryOperationTimeout
74     *          Call timeout
75     * @throws IOException
76     *           If the remote call fails
77     */
78    public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
79        final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
80        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
81        throws IOException {
82      this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
83          primaryOperationTimeout, new SmallScannerCallableFactory());
84    }
85  
86    /**
87     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
88     * {@link Scan}'s start row may be changed.
89     *
90     * @param conf
91     *          The {@link Configuration} to use.
92     * @param scan
93     *          {@link Scan} to use in this scanner
94     * @param tableName
95     *          The table that we wish to rangeGet
96     * @param connection
97     *          Connection identifying the cluster
98     * @param rpcFactory
99     *          Factory used to create the {@link RpcRetryingCaller}
100    * @param controllerFactory
101    *          Factory used to access RPC payloads
102    * @param pool
103    *          Threadpool for RPC threads
104    * @param primaryOperationTimeout
105    *          Call timeout
106    * @param callableFactory
107    *          Factory used to create the {@link SmallScannerCallable}
108    * @throws IOException
109    *           If the remote call fails
110    */
111   @VisibleForTesting
112   ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
113       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
114       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
115       SmallScannerCallableFactory callableFactory) throws IOException {
116     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
117         primaryOperationTimeout);
118     this.callableFactory = callableFactory;
119   }
120 
121   /**
122    * Gets a scanner for following scan. Move to next region or continue from the last result or
123    * start from the start row.
124    *
125    * @param nbRows
126    * @param done
127    *          true if Server-side says we're done scanning.
128    * @param currentRegionDone
129    *          true if scan is over on current region
130    * @return true if has next scanner
131    * @throws IOException
132    */
133   private boolean nextScanner(int nbRows, final boolean done,
134                               boolean currentRegionDone) throws IOException {
135     // Where to start the next getter
136     byte[] localStartKey;
137     int cacheNum = nbRows;
138     boolean regionChanged = true;
139     // if we're at end of table, close and return false to stop iterating
140     if (this.currentRegion != null && currentRegionDone) {
141       byte[] startKey = this.currentRegion.getStartKey();
142       if (startKey == null
143           || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
144           || checkScanStopRow(startKey) || done) {
145         close();
146         if (LOG.isDebugEnabled()) {
147           LOG.debug("Finished with small scan at " + this.currentRegion);
148         }
149         return false;
150       }
151       // We take the row just under to get to the previous region.
152       localStartKey = createClosestRowBefore(startKey);
153       if (LOG.isDebugEnabled()) {
154         LOG.debug("Finished with region " + this.currentRegion);
155       }
156     } else if (this.lastResult != null) {
157       regionChanged = false;
158       localStartKey = createClosestRowBefore(lastResult.getRow());
159     } else {
160       localStartKey = this.scan.getStartRow();
161     }
162 
163     if (LOG.isTraceEnabled()) {
164       LOG.trace("Advancing internal small scanner to startKey at '"
165           + Bytes.toStringBinary(localStartKey) + "'");
166     }
167 
168     smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
169         getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
170         getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
171 
172     if (this.scanMetrics != null && regionChanged) {
173       this.scanMetrics.countOfRegions.incrementAndGet();
174     }
175     return true;
176   }
177 
178   @Override
179   public Result next() throws IOException {
180     // If the scanner is closed and there's nothing left in the cache, next is a
181     // no-op.
182     if (cache.size() == 0 && this.closed) {
183       return null;
184     }
185     if (cache.size() == 0) {
186       loadCache();
187     }
188 
189     if (cache.size() > 0) {
190       return cache.poll();
191     }
192     // if we exhausted this scanner before calling close, write out the scan
193     // metrics
194     writeScanMetrics();
195     return null;
196   }
197 
198   @Override
199   protected void loadCache() throws IOException {
200     Result[] values = null;
201     long remainingResultSize = maxScannerResultSize;
202     int countdown = this.caching;
203     boolean currentRegionDone = false;
204     // Values == null means server-side filter has determined we must STOP
205     while (remainingResultSize > 0 && countdown > 0
206         && nextScanner(countdown, values == null, currentRegionDone)) {
207       // Server returns a null values if scanning is to stop. Else,
208       // returns an empty array if scanning is to go on and we've just
209       // exhausted current region.
210       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
211       // we do a callWithRetries
212       values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
213       this.currentRegion = smallScanCallable.getHRegionInfo();
214       long currentTime = System.currentTimeMillis();
215       if (this.scanMetrics != null) {
216         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
217             - lastNext);
218       }
219       lastNext = currentTime;
220       if (values != null && values.length > 0) {
221         for (int i = 0; i < values.length; i++) {
222           Result rs = values[i];
223           cache.add(rs);
224           // We don't make Iterator here
225           for (Cell cell : rs.rawCells()) {
226             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
227           }
228           countdown--;
229           this.lastResult = rs;
230         }
231       }
232       if (smallScanCallable.hasMoreResultsContext()) {
233         currentRegionDone = !smallScanCallable.getServerHasMoreResults();
234       } else {
235         currentRegionDone = countdown > 0;
236       }
237     }
238   }
239 
240   @Override
241   protected void initializeScannerInConstruction() throws IOException {
242     // No need to initialize the scanner when constructing instance, do it when
243     // calling next(). Do nothing here.
244   }
245 
246   @Override
247   public void close() {
248     if (!scanMetricsPublished) writeScanMetrics();
249     closed = true;
250   }
251 
252   @VisibleForTesting
253   protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
254     this.callableFactory = callableFactory;
255   }
256 }