1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.classification.InterfaceAudience;
23 import org.apache.hadoop.classification.InterfaceStability;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.KeyValue;
28 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
29 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
30 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
31 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
32 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
33 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.LinkedList;
41
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class ClientScanner extends AbstractClientScanner {
50 private final Log LOG = LogFactory.getLog(this.getClass());
51 private Scan scan;
52 private boolean closed = false;
53
54
55 private HRegionInfo currentRegion = null;
56 private ScannerCallable callable = null;
57 private final LinkedList<Result> cache = new LinkedList<Result>();
58 private final int caching;
59 private long lastNext;
60
61 private Result lastResult = null;
62 private ScanMetrics scanMetrics = null;
63 private final long maxScannerResultSize;
64 private final HConnection connection;
65 private final byte[] tableName;
66 private final int scannerTimeout;
67
68
69
70
71
72
73
74
75
76
77
78 public ClientScanner(final Configuration conf, final Scan scan,
79 final byte[] tableName) throws IOException {
80 this(conf, scan, tableName, HConnectionManager.getConnection(conf));
81 }
82
83
84
85
86
87
88
89
90
91
92
93 public ClientScanner(final Configuration conf, final Scan scan,
94 final byte[] tableName, HConnection connection) throws IOException {
95 if (LOG.isDebugEnabled()) {
96 LOG.debug("Creating scanner over "
97 + Bytes.toString(tableName)
98 + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
99 }
100 this.scan = scan;
101 this.tableName = tableName;
102 this.lastNext = System.currentTimeMillis();
103 this.connection = connection;
104 if (scan.getMaxResultSize() > 0) {
105 this.maxScannerResultSize = scan.getMaxResultSize();
106 } else {
107 this.maxScannerResultSize = conf.getLong(
108 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
109 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
110 }
111 this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
112 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
113
114
115 byte[] enableMetrics = scan.getAttribute(
116 Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
117 if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
118 scanMetrics = new ScanMetrics();
119 }
120
121
122 if (this.scan.getCaching() > 0) {
123 this.caching = this.scan.getCaching();
124 } else {
125 this.caching = conf.getInt(
126 HConstants.HBASE_CLIENT_SCANNER_CACHING,
127 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
128 }
129
130
131 nextScanner(this.caching, false);
132 }
133
134 protected HConnection getConnection() {
135 return this.connection;
136 }
137
138 protected byte[] getTableName() {
139 return this.tableName;
140 }
141
142 protected Scan getScan() {
143 return scan;
144 }
145
146 protected long getTimestamp() {
147 return lastNext;
148 }
149
150
151 private boolean checkScanStopRow(final byte [] endKey) {
152 if (this.scan.getStopRow().length > 0) {
153
154 byte [] stopRow = scan.getStopRow();
155 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
156 endKey, 0, endKey.length);
157 if (cmp <= 0) {
158
159
160 return true;
161 }
162 }
163 return false;
164 }
165
166
167
168
169
170
171
172
173
174
175 private boolean nextScanner(int nbRows, final boolean done)
176 throws IOException {
177
178 if (this.callable != null) {
179 this.callable.setClose();
180 callable.withRetries();
181 this.callable = null;
182 }
183
184
185 byte [] localStartKey;
186
187
188 if (this.currentRegion != null) {
189 byte [] endKey = this.currentRegion.getEndKey();
190 if (endKey == null ||
191 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
192 checkScanStopRow(endKey) ||
193 done) {
194 close();
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("Finished scanning region " + this.currentRegion);
197 }
198 return false;
199 }
200 localStartKey = endKey;
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("Finished with region " + this.currentRegion);
203 }
204 } else {
205 localStartKey = this.scan.getStartRow();
206 }
207
208 if (LOG.isDebugEnabled()) {
209 LOG.debug("Advancing internal scanner to startKey at '" +
210 Bytes.toStringBinary(localStartKey) + "'");
211 }
212 try {
213 callable = getScannerCallable(localStartKey, nbRows);
214
215
216 callable.withRetries();
217 this.currentRegion = callable.getHRegionInfo();
218 if (this.scanMetrics != null) {
219 this.scanMetrics.countOfRegions.incrementAndGet();
220 }
221 } catch (IOException e) {
222 close();
223 throw e;
224 }
225 return true;
226 }
227
228 protected ScannerCallable getScannerCallable(byte [] localStartKey,
229 int nbRows) {
230 scan.setStartRow(localStartKey);
231 ScannerCallable s = new ScannerCallable(getConnection(),
232 getTableName(), scan, this.scanMetrics);
233 s.setCaching(nbRows);
234 return s;
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248 private void writeScanMetrics() throws IOException {
249 if (this.scanMetrics == null) {
250 return;
251 }
252 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
253 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
254 }
255
256 public Result next() throws IOException {
257
258 if (cache.size() == 0 && this.closed) {
259 return null;
260 }
261 if (cache.size() == 0) {
262 Result [] values = null;
263 long remainingResultSize = maxScannerResultSize;
264 int countdown = this.caching;
265
266
267 callable.setCaching(this.caching);
268
269
270 boolean skipFirst = false;
271 boolean retryAfterOutOfOrderException = true;
272 do {
273 try {
274 if (skipFirst) {
275
276
277 callable.setCaching(1);
278 values = callable.withRetries();
279 callable.setCaching(this.caching);
280 skipFirst = false;
281 }
282
283
284
285 values = callable.withRetries();
286 retryAfterOutOfOrderException = true;
287 } catch (DoNotRetryIOException e) {
288
289
290 if (e instanceof UnknownScannerException) {
291 long timeout = lastNext + scannerTimeout;
292
293
294
295 if (timeout < System.currentTimeMillis()) {
296 long elapsed = System.currentTimeMillis() - lastNext;
297 ScannerTimeoutException ex = new ScannerTimeoutException(
298 elapsed + "ms passed since the last invocation, " +
299 "timeout is currently set to " + scannerTimeout);
300 ex.initCause(e);
301 throw ex;
302 }
303 } else {
304
305
306 Throwable cause = e.getCause();
307 if ((cause != null && cause instanceof NotServingRegionException) ||
308 (cause != null && cause instanceof RegionServerStoppedException) ||
309 e instanceof OutOfOrderScannerNextException) {
310
311
312
313 } else {
314 throw e;
315 }
316 }
317
318 if (this.lastResult != null) {
319 this.scan.setStartRow(this.lastResult.getRow());
320
321
322 skipFirst = true;
323 }
324 if (e instanceof OutOfOrderScannerNextException) {
325 if (retryAfterOutOfOrderException) {
326 retryAfterOutOfOrderException = false;
327 } else {
328
329 throw new DoNotRetryIOException("Failed after retry of " +
330 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
331 }
332 }
333
334 this.currentRegion = null;
335
336
337 callable = null;
338
339 continue;
340 }
341 long currentTime = System.currentTimeMillis();
342 if (this.scanMetrics != null ) {
343 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
344 }
345 lastNext = currentTime;
346 if (values != null && values.length > 0) {
347 for (Result rs : values) {
348 cache.add(rs);
349 for (KeyValue kv : rs.raw()) {
350 remainingResultSize -= kv.heapSize();
351 }
352 countdown--;
353 this.lastResult = rs;
354 }
355 }
356
357 } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
358 }
359
360 if (cache.size() > 0) {
361 return cache.poll();
362 }
363
364
365 writeScanMetrics();
366 return null;
367 }
368
369
370
371
372
373
374
375
376
377
378 public Result [] next(int nbRows) throws IOException {
379
380 ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
381 for(int i = 0; i < nbRows; i++) {
382 Result next = next();
383 if (next != null) {
384 resultSets.add(next);
385 } else {
386 break;
387 }
388 }
389 return resultSets.toArray(new Result[resultSets.size()]);
390 }
391
392 public void close() {
393 if (callable != null) {
394 callable.setClose();
395 try {
396 callable.withRetries();
397 } catch (IOException e) {
398
399
400
401
402 } finally {
403
404 try {
405 writeScanMetrics();
406 } catch (IOException e) {
407
408 }
409 }
410 callable = null;
411 }
412 closed = true;
413 }
414 }