View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  import java.io.IOException;
35  import java.util.concurrent.ExecutorService;
36  
37  /**
38   * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
39   * scan results, unless the results cross multiple regions or the row count of
40   * results exceed the caching.
41   * <p/>
42   * For small scan, it will get better performance than {@link ReversedClientScanner}
43   */
44  @InterfaceAudience.Private
45  public class ClientSmallReversedScanner extends ReversedClientScanner {
46    private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
47    private ScannerCallableWithReplicas smallScanCallable = null;
48    private byte[] skipRowOfFirstResult = null;
49  
50    /**
51     * Create a new ReversibleClientScanner for the specified table Note that the
52     * passed {@link Scan}'s start row maybe changed changed.
53     *
54     * @param conf The {@link Configuration} to use.
55     * @param scan {@link Scan} to use in this scanner
56     * @param tableName The table that we wish to rangeGet
57     * @param connection Connection identifying the cluster
58     * @param rpcFactory
59     * @throws IOException
60     */
61    public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
62        final TableName tableName, ClusterConnection connection,
63        RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory,
64        ExecutorService pool, int primaryOperationTimeout) throws IOException {
65      super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout);
66    }
67  
68    /**
69     * Gets a scanner for following scan. Move to next region or continue from the
70     * last result or start from the start row.
71     *
72     * @param nbRows
73     * @param done              true if Server-side says we're done scanning.
74     * @param currentRegionDone true if scan is over on current region
75     * @return true if has next scanner
76     * @throws IOException
77     */
78    private boolean nextScanner(int nbRows, final boolean done,
79                                boolean currentRegionDone) throws IOException {
80      // Where to start the next getter
81      byte[] localStartKey;
82      int cacheNum = nbRows;
83      skipRowOfFirstResult = null;
84      // if we're at end of table, close and return false to stop iterating
85      if (this.currentRegion != null && currentRegionDone) {
86        byte[] startKey = this.currentRegion.getStartKey();
87        if (startKey == null
88            || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
89            || checkScanStopRow(startKey) || done) {
90          close();
91          if (LOG.isDebugEnabled()) {
92            LOG.debug("Finished with small scan at " + this.currentRegion);
93          }
94          return false;
95        }
96        // We take the row just under to get to the previous region.
97        localStartKey = createClosestRowBefore(startKey);
98        if (LOG.isDebugEnabled()) {
99          LOG.debug("Finished with region " + this.currentRegion);
100       }
101     } else if (this.lastResult != null) {
102       localStartKey = this.lastResult.getRow();
103       skipRowOfFirstResult = this.lastResult.getRow();
104       cacheNum++;
105     } else {
106       localStartKey = this.scan.getStartRow();
107     }
108 
109     if (LOG.isTraceEnabled()) {
110       LOG.trace("Advancing internal small scanner to startKey at '"
111           + Bytes.toStringBinary(localStartKey) + "'");
112     }
113 
114     smallScanCallable = ClientSmallScanner.getSmallScanCallable(
115       getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum,
116       rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
117       getRetries(), getScannerTimeout(), getConf(), caller);
118 
119     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
120       this.scanMetrics.countOfRegions.incrementAndGet();
121     }
122     return true;
123   }
124 
125   @Override
126   public Result next() throws IOException {
127     // If the scanner is closed and there's nothing left in the cache, next is a
128     // no-op.
129     if (cache.size() == 0 && this.closed) {
130       return null;
131     }
132     if (cache.size() == 0) {
133       Result[] values = null;
134       long remainingResultSize = maxScannerResultSize;
135       int countdown = this.caching;
136       boolean currentRegionDone = false;
137       // Values == null means server-side filter has determined we must STOP
138       while (remainingResultSize > 0 && countdown > 0
139           && nextScanner(countdown, values == null, currentRegionDone)) {
140         // Server returns a null values if scanning is to stop. Else,
141         // returns an empty array if scanning is to go on and we've just
142         // exhausted current region.
143         // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
144         // we do a callWithRetries
145         values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
146         this.currentRegion = smallScanCallable.getHRegionInfo();
147         long currentTime = System.currentTimeMillis();
148         if (this.scanMetrics != null) {
149           this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
150               - lastNext);
151         }
152         lastNext = currentTime;
153         if (values != null && values.length > 0) {
154           for (int i = 0; i < values.length; i++) {
155             Result rs = values[i];
156             if (i == 0 && this.skipRowOfFirstResult != null
157                 && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
158               // Skip the first result
159               continue;
160             }
161             cache.add(rs);
162             // We don't make Iterator here
163             for (Cell cell : rs.rawCells()) {
164               remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
165             }
166             countdown--;
167             this.lastResult = rs;
168           }
169         }
170         currentRegionDone = countdown > 0;
171       }
172     }
173 
174     if (cache.size() > 0) {
175       return cache.poll();
176     }
177     // if we exhausted this scanner before calling close, write out the scan
178     // metrics
179     writeScanMetrics();
180     return null;
181   }
182 
183 
184   @Override
185   protected void initializeScannerInConstruction() throws IOException {
186     // No need to initialize the scanner when constructing instance, do it when
187     // calling next(). Do nothing here.
188   }
189 
190   @Override
191   public void close() {
192     if (!scanMetricsPublished) writeScanMetrics();
193     closed = true;
194   }
195 
196 }