1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.concurrent.ExecutorService;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.RequestConverter;
37 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
40 import org.apache.hadoop.hbase.util.Bytes;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.protobuf.ServiceException;
44
45
46
47
48
49
50
51
52 @InterfaceAudience.Private
53 public class ClientSmallScanner extends ClientScanner {
54 private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class);
55 private ScannerCallableWithReplicas smallScanCallable = null;
56 private SmallScannerCallableFactory callableFactory;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
82 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
83 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
84 throws IOException {
85 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
86 primaryOperationTimeout, new SmallScannerCallableFactory());
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @VisibleForTesting
115 ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
116 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
117 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
118 SmallScannerCallableFactory callableFactory) throws IOException {
119 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
120 primaryOperationTimeout);
121 this.callableFactory = callableFactory;
122 }
123
124 @Override
125 protected void initializeScannerInConstruction() throws IOException {
126
127
128 }
129
130
131
132
133
134
135
136
137
138
139 private boolean nextScanner(int nbRows, final boolean done,
140 boolean currentRegionDone) throws IOException {
141
142 byte[] localStartKey;
143 int cacheNum = nbRows;
144 boolean regionChanged = true;
145
146 if (this.currentRegion != null && currentRegionDone) {
147 byte[] endKey = this.currentRegion.getEndKey();
148 if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
149 || checkScanStopRow(endKey) || done) {
150 close();
151 if (LOG.isTraceEnabled()) {
152 LOG.trace("Finished with small scan at " + this.currentRegion);
153 }
154 return false;
155 }
156 localStartKey = endKey;
157 if (LOG.isTraceEnabled()) {
158 LOG.trace("Finished with region " + this.currentRegion);
159 }
160 } else if (this.lastResult != null) {
161 regionChanged = false;
162 localStartKey = Bytes.add(lastResult.getRow(), new byte[1]);
163 } else {
164 localStartKey = this.scan.getStartRow();
165 }
166
167 if (LOG.isTraceEnabled()) {
168 LOG.trace("Advancing internal small scanner to startKey at '"
169 + Bytes.toStringBinary(localStartKey) + "'");
170 }
171 smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
172 getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
173 getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
174 if (this.scanMetrics != null && regionChanged) {
175 this.scanMetrics.countOfRegions.incrementAndGet();
176 }
177 return true;
178 }
179
180 static class SmallScannerCallable extends ScannerCallable {
181 public SmallScannerCallable(
182 ClusterConnection connection, TableName table, Scan scan,
183 ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) {
184 super(connection, table, scan, scanMetrics, controllerFactory, id);
185 this.setCaching(caching);
186 }
187
188 @Override
189 public Result[] call(int timeout) throws IOException {
190 if (this.closed) return null;
191 if (Thread.interrupted()) {
192 throw new InterruptedIOException();
193 }
194 ScanRequest request = RequestConverter.buildScanRequest(getLocation()
195 .getRegionInfo().getRegionName(), getScan(), getCaching(), true);
196 ScanResponse response = null;
197 controller = controllerFactory.newController();
198 try {
199 controller.setPriority(getTableName());
200 controller.setCallTimeout(timeout);
201 response = getStub().scan(controller, request);
202 Result[] results = ResponseConverter.getResults(controller.cellScanner(),
203 response);
204 if (response.hasMoreResultsInRegion()) {
205 setHasMoreResultsContext(true);
206 setServerHasMoreResults(response.getMoreResultsInRegion());
207 } else {
208 setHasMoreResultsContext(false);
209 }
210
211 updateResultsMetrics(results);
212 return results;
213 } catch (ServiceException se) {
214 throw ProtobufUtil.getRemoteException(se);
215 }
216 }
217
218 @Override
219 public ScannerCallable getScannerCallableForReplica(int id) {
220 return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
221 scanMetrics, controllerFactory, getCaching(), id);
222 }
223 }
224
225 @Override
226 public Result next() throws IOException {
227
228
229 if (cache.size() == 0 && this.closed) {
230 return null;
231 }
232 if (cache.size() == 0) {
233 loadCache();
234 }
235
236 if (cache.size() > 0) {
237 return cache.poll();
238 }
239
240
241 writeScanMetrics();
242 return null;
243 }
244
245 @Override
246 protected void loadCache() throws IOException {
247 Result[] values = null;
248 long remainingResultSize = maxScannerResultSize;
249 int countdown = this.caching;
250 boolean currentRegionDone = false;
251
252 while (remainingResultSize > 0 && countdown > 0
253 && nextScanner(countdown, values == null, currentRegionDone)) {
254
255
256
257
258
259 values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
260 this.currentRegion = smallScanCallable.getHRegionInfo();
261 long currentTime = System.currentTimeMillis();
262 if (this.scanMetrics != null) {
263 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
264 - lastNext);
265 }
266 lastNext = currentTime;
267 if (values != null && values.length > 0) {
268 for (int i = 0; i < values.length; i++) {
269 Result rs = values[i];
270 cache.add(rs);
271
272 for (Cell cell : rs.rawCells()) {
273 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
274 }
275 countdown--;
276 this.lastResult = rs;
277 }
278 }
279 if (smallScanCallable.hasMoreResultsContext()) {
280
281 currentRegionDone = !smallScanCallable.getServerHasMoreResults();
282 } else {
283
284 currentRegionDone = countdown > 0;
285 }
286 }
287 }
288
289 public void close() {
290 if (!scanMetricsPublished) writeScanMetrics();
291 closed = true;
292 }
293
294 @VisibleForTesting
295 protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
296 this.callableFactory = callableFactory;
297 }
298
299 @InterfaceAudience.Private
300 protected static class SmallScannerCallableFactory {
301
302 public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
303 Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
304 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
305 int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
306 scan.setStartRow(localStartKey);
307 SmallScannerCallable s = new SmallScannerCallable(
308 connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
309 ScannerCallableWithReplicas scannerCallableWithReplicas =
310 new ScannerCallableWithReplicas(table, connection,
311 s, pool, primaryOperationTimeout, scan, retries,
312 scannerTimeout, cacheNum, conf, caller);
313 return scannerCallableWithReplicas;
314 }
315
316 }
317 }