1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.classification.InterfaceAudience;
23 import org.apache.hadoop.classification.InterfaceStability;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.KeyValue;
28 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
29 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
30 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
31 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
32 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
33 import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
34 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
35 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.LinkedList;
41
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Stable
49 public class ClientScanner extends AbstractClientScanner {
50 private final Log LOG = LogFactory.getLog(this.getClass());
51 private Scan scan;
52 private boolean closed = false;
53
54
55 private HRegionInfo currentRegion = null;
56 private ScannerCallable callable = null;
57 private final LinkedList<Result> cache = new LinkedList<Result>();
58 private final int caching;
59 private long lastNext;
60
61 private Result lastResult = null;
62 private ScanMetrics scanMetrics = null;
63 private final long maxScannerResultSize;
64 private final HConnection connection;
65 private final byte[] tableName;
66 private final int scannerTimeout;
67 private boolean scanMetricsPublished = false;
68
69
70
71
72
73
74
75
76
77
78
79 public ClientScanner(final Configuration conf, final Scan scan,
80 final byte[] tableName) throws IOException {
81 this(conf, scan, tableName, HConnectionManager.getConnection(conf));
82 }
83
84
85
86
87
88
89
90
91
92
93
94 public ClientScanner(final Configuration conf, final Scan scan,
95 final byte[] tableName, HConnection connection) throws IOException {
96 if (LOG.isDebugEnabled()) {
97 LOG.debug("Scan table=" + Bytes.toString(tableName)
98 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
99 }
100 this.scan = scan;
101 this.tableName = tableName;
102 this.lastNext = System.currentTimeMillis();
103 this.connection = connection;
104 if (scan.getMaxResultSize() > 0) {
105 this.maxScannerResultSize = scan.getMaxResultSize();
106 } else {
107 this.maxScannerResultSize = conf.getLong(
108 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
109 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
110 }
111 this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
112 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
113
114
115 byte[] enableMetrics = scan.getAttribute(
116 Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
117 if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
118 scanMetrics = new ScanMetrics();
119 }
120
121
122 if (this.scan.getCaching() > 0) {
123 this.caching = this.scan.getCaching();
124 } else {
125 this.caching = conf.getInt(
126 HConstants.HBASE_CLIENT_SCANNER_CACHING,
127 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
128 }
129
130
131 nextScanner(false);
132 }
133
134 protected HConnection getConnection() {
135 return this.connection;
136 }
137
138 protected byte[] getTableName() {
139 return this.tableName;
140 }
141
142 protected Scan getScan() {
143 return scan;
144 }
145
146 protected long getTimestamp() {
147 return lastNext;
148 }
149
150
151 private boolean checkScanStopRow(final byte [] endKey) {
152 if (this.scan.getStopRow().length > 0) {
153
154 byte [] stopRow = scan.getStopRow();
155 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
156 endKey, 0, endKey.length);
157 if (cmp <= 0) {
158
159
160 return true;
161 }
162 }
163 return false;
164 }
165
166
167
168
169
170
171
172
173
174 private boolean nextScanner(final boolean done)
175 throws IOException {
176
177 if (this.callable != null) {
178 this.callable.setClose();
179 callable.withRetries();
180 this.callable = null;
181 }
182
183
184 byte [] localStartKey;
185
186
187 if (this.currentRegion != null) {
188 byte [] endKey = this.currentRegion.getEndKey();
189 if (endKey == null ||
190 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
191 checkScanStopRow(endKey) ||
192 done) {
193 close();
194 if (LOG.isDebugEnabled()) {
195 LOG.debug("Finished region=" + this.currentRegion);
196 }
197 return false;
198 }
199 localStartKey = endKey;
200 if (LOG.isDebugEnabled()) {
201 LOG.debug("Finished with region " + this.currentRegion);
202 }
203 } else {
204 localStartKey = this.scan.getStartRow();
205 }
206
207 if (LOG.isDebugEnabled() && this.currentRegion != null) {
208
209 LOG.debug("Advancing internal scanner to startKey at '" +
210 Bytes.toStringBinary(localStartKey) + "'");
211 }
212 try {
213 callable = getScannerCallable(localStartKey);
214
215
216 callable.withRetries();
217 this.currentRegion = callable.getHRegionInfo();
218 if (this.scanMetrics != null) {
219 this.scanMetrics.countOfRegions.incrementAndGet();
220 }
221 } catch (IOException e) {
222 close();
223 throw e;
224 }
225 return true;
226 }
227
228 protected ScannerCallable getScannerCallable(byte [] localStartKey) {
229 scan.setStartRow(localStartKey);
230 ScannerCallable s = new ScannerCallable(getConnection(),
231 getTableName(), scan, this.scanMetrics);
232 s.setCaching(this.caching);
233 return s;
234 }
235
236
237
238
239
240
241
242
243
244
245
246
247 private void writeScanMetrics() {
248 if (this.scanMetrics == null || scanMetricsPublished) {
249 return;
250 }
251 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
252 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
253 scanMetricsPublished = true;
254 }
255
256 public Result next() throws IOException {
257
258 if (cache.size() == 0 && this.closed) {
259 return null;
260 }
261 if (cache.size() == 0) {
262 Result [] values = null;
263 long remainingResultSize = maxScannerResultSize;
264 int countdown = this.caching;
265
266
267
268 boolean skipFirst = false;
269 boolean retryAfterOutOfOrderException = true;
270 do {
271 try {
272
273
274
275 values = callable.withRetries();
276 if (skipFirst && values != null && values.length == 1) {
277 skipFirst = false;
278 values = callable.withRetries();
279 }
280 retryAfterOutOfOrderException = true;
281 } catch (DoNotRetryIOException e) {
282
283
284 if (e instanceof UnknownScannerException) {
285 long timeout = lastNext + scannerTimeout;
286
287
288
289 if (timeout < System.currentTimeMillis()) {
290 long elapsed = System.currentTimeMillis() - lastNext;
291 ScannerTimeoutException ex = new ScannerTimeoutException(
292 elapsed + "ms passed since the last invocation, " +
293 "timeout is currently set to " + scannerTimeout);
294 ex.initCause(e);
295 throw ex;
296 }
297 } else {
298
299
300 Throwable cause = e.getCause();
301 if ((cause != null && cause instanceof NotServingRegionException) ||
302 (cause != null && cause instanceof RegionServerStoppedException) ||
303 e instanceof OutOfOrderScannerNextException) {
304
305
306
307 } else {
308 throw e;
309 }
310 }
311
312 if (this.lastResult != null) {
313 this.scan.setStartRow(this.lastResult.getRow());
314
315
316 skipFirst = true;
317 }
318 if (e instanceof OutOfOrderScannerNextException) {
319 if (retryAfterOutOfOrderException) {
320 retryAfterOutOfOrderException = false;
321 } else {
322
323 throw new DoNotRetryIOException("Failed after retry of " +
324 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
325 }
326 }
327
328 this.currentRegion = null;
329
330
331 callable = null;
332
333 continue;
334 }
335 long currentTime = System.currentTimeMillis();
336 if (this.scanMetrics != null ) {
337 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
338 }
339 lastNext = currentTime;
340 if (values != null && values.length > 0) {
341 int i = 0;
342 if (skipFirst) {
343 skipFirst = false;
344
345 countdown--;
346 i = 1;
347 }
348 for (; i < values.length; i++) {
349 Result rs = values[i];
350 cache.add(rs);
351 for (KeyValue kv : rs.raw()) {
352 remainingResultSize -= kv.heapSize();
353 }
354 countdown--;
355 this.lastResult = rs;
356 }
357 }
358
359 } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
360 }
361
362 if (cache.size() > 0) {
363 return cache.poll();
364 }
365
366
367 writeScanMetrics();
368 return null;
369 }
370
371
372
373
374
375
376
377
378
379
380 public Result [] next(int nbRows) throws IOException {
381
382 ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
383 for(int i = 0; i < nbRows; i++) {
384 Result next = next();
385 if (next != null) {
386 resultSets.add(next);
387 } else {
388 break;
389 }
390 }
391 return resultSets.toArray(new Result[resultSets.size()]);
392 }
393
394 public void close() {
395 if (!scanMetricsPublished) writeScanMetrics();
396 if (callable != null) {
397 callable.setClose();
398 try {
399 callable.withRetries();
400 } catch (IOException e) {
401
402
403
404
405 }
406 callable = null;
407 }
408 closed = true;
409 }
410
411 long currentScannerId() {
412 return (callable == null) ? -1L : callable.scannerId;
413 }
414
415 HRegionInfo currentRegionInfo() {
416 return currentRegion;
417 }
418 }