View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
29  import org.apache.hadoop.hbase.util.Bytes;
30  
31  import com.google.protobuf.ServiceException;
32  
33  /**
34   * Client scanner for small scan. Generally, only one RPC is called to fetch the
35   * scan results, unless the results cross multiple regions or the row count of
36   * results excess the caching.
37   * 
38   * For small scan, it will get better performance than {@link ClientScanner}
39   */
40  public class ClientSmallScanner extends ClientScanner {
41    private final Log LOG = LogFactory.getLog(this.getClass());
42    private ServerCallable<Result[]> smallScanCallable = null;
43    // When fetching results from server, skip the first result if it has the same
44    // row with this one
45    private byte[] skipRowOfFirstResult = null;
46  
47    /**
48     * Create a new ClientSmallScanner for the specified table. An HConnection
49     * will be retrieved using the passed Configuration. Note that the passed
50     * {@link Scan} 's start row maybe changed.
51     * 
52     * @param conf The {@link Configuration} to use.
53     * @param scan {@link Scan} to use in this scanner
54     * @param tableName The table that we wish to rangeGet
55     * @throws IOException
56     */
57    public ClientSmallScanner(final Configuration conf, final Scan scan,
58        final byte[] tableName) throws IOException {
59      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
60    }
61  
62    /**
63     * Create a new ClientSmallScanner for the specified table. An HConnection
64     * will be retrieved using the passed Configuration. Note that the passed
65     * {@link Scan} 's start row maybe changed.
66     * @param conf
67     * @param scan
68     * @param tableName
69     * @param connection
70     * @throws IOException
71     */
72    public ClientSmallScanner(final Configuration conf, final Scan scan,
73        final byte[] tableName, HConnection connection) throws IOException {
74      super(conf, scan, tableName, connection);
75    }
76  
77    @Override
78    protected void initializeScannerInConstruction() throws IOException {
79      // No need to initialize the scanner when constructing instance, do it when
80      // calling next(). Do nothing here.
81    }
82  
83    /**
84     * Gets a scanner for following scan. Move to next region or continue from the
85     * last result or start from the start row.
86     * @param nbRows
87     * @param done true if Server-side says we're done scanning.
88     * @param currentRegionDone true if scan is over on current region
89     * @return true if has next scanner
90     * @throws IOException
91     */
92    private boolean nextScanner(int nbRows, final boolean done,
93        boolean currentRegionDone) throws IOException {
94      // Where to start the next getter
95      byte[] localStartKey;
96      int cacheNum = nbRows;
97      skipRowOfFirstResult = null;
98      // if we're at end of table, close and return false to stop iterating
99      if (this.currentRegion != null && currentRegionDone) {
100       byte[] endKey = this.currentRegion.getEndKey();
101       if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
102           || checkScanStopRow(endKey) || done) {
103         close();
104         if (LOG.isDebugEnabled()) {
105           LOG.debug("Finished with small scan at " + this.currentRegion);
106         }
107         return false;
108       }
109       localStartKey = endKey;
110       if (LOG.isDebugEnabled()) {
111         LOG.debug("Finished with region " + this.currentRegion);
112       }
113     } else if (this.lastResult != null) {
114       localStartKey = this.lastResult.getRow();
115       skipRowOfFirstResult = this.lastResult.getRow();
116       cacheNum++;
117     } else {
118       localStartKey = this.scan.getStartRow();
119     }
120 
121     if (LOG.isDebugEnabled()) {
122       LOG.debug("Advancing internal small scanner to startKey at '"
123           + Bytes.toStringBinary(localStartKey) + "'");
124     }
125     smallScanCallable = getSmallScanCallable(localStartKey, cacheNum);
126     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
127       this.scanMetrics.countOfRegions.inc();
128     }
129     return true;
130   }
131 
132   private ServerCallable<Result[]> getSmallScanCallable(byte[] localStartKey,
133       final int nbRows) {
134     this.scan.setStartRow(localStartKey);
135     ServerCallable<Result[]> callable = new ServerCallable<Result[]>(
136         getConnection(), getTableName(), scan.getStartRow()) {
137       public Result[] call() throws IOException {
138         return server.scan(location.getRegionInfo().getRegionName(), scan,
139             nbRows);
140       }
141     };
142     return callable;
143   }
144 
145   @Override
146   public Result next() throws IOException {
147     // If the scanner is closed and there's nothing left in the cache, next is a
148     // no-op.
149     if (cache.size() == 0 && this.closed) {
150       return null;
151     }
152     if (cache.size() == 0) {
153       Result[] values = null;
154       long remainingResultSize = maxScannerResultSize;
155       int countdown = this.caching;
156       boolean currentRegionDone = false;
157       // Values == null means server-side filter has determined we must STOP
158       while (remainingResultSize > 0 && countdown > 0
159           && nextScanner(countdown, values == null, currentRegionDone)) {
160         // Server returns a null values if scanning is to stop. Else,
161         // returns an empty array if scanning is to go on and we've just
162         // exhausted current region.
163         values = smallScanCallable.withRetries();
164         this.currentRegion = smallScanCallable.getHRegionInfo();
165         long currentTime = System.currentTimeMillis();
166         if (this.scanMetrics != null) {
167           this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime
168               - lastNext);
169         }
170         lastNext = currentTime;
171         if (values != null && values.length > 0) {
172           for (int i = 0; i < values.length; i++) {
173             Result rs = values[i];
174             if (i == 0 && this.skipRowOfFirstResult != null
175                 && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
176               // Skip the first result
177               continue;
178             }
179             cache.add(rs);
180             for (KeyValue kv : rs.raw()) {
181               remainingResultSize -= kv.heapSize();
182             }
183             countdown--;
184             this.lastResult = rs;
185           }
186         }
187         currentRegionDone = countdown > 0;
188       }
189     }
190 
191     if (cache.size() > 0) {
192       return cache.poll();
193     }
194     // if we exhausted this scanner before calling close, write out the scan
195     // metrics
196     writeScanMetrics();
197     return null;
198   }
199 
200   @Override
201   public void close() {
202     closed = true;
203     try {
204       writeScanMetrics();
205     } catch (IOException e) {
206       // As ClientScanner#close, we don't want the scanner close() method to
207       // throw.
208     }
209   }
210 }