1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellComparator;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NotServingRegionException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.UnknownScannerException;
43 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
48 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
49 import org.apache.hadoop.hbase.util.Bytes;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53 /**
54 * Implements the scanner interface for the HBase client.
55 * If there are multiple regions in a table, this scanner will iterate
56 * through them all.
57 */
58 @InterfaceAudience.Private
59 public class ClientScanner extends AbstractClientScanner {
60 private static final Log LOG = LogFactory.getLog(ClientScanner.class);
61 // A byte array in which all elements are the max byte, and it is used to
62 // construct closest front row
63 static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
64 protected Scan scan;
65 protected boolean closed = false;
66 // Current region scanner is against. Gets cleared if current region goes
67 // wonky: e.g. if it splits on us.
68 protected HRegionInfo currentRegion = null;
69 protected ScannerCallableWithReplicas callable = null;
70 protected final LinkedList<Result> cache = new LinkedList<Result>();
71 /**
72 * A list of partial results that have been returned from the server. This list should only
73 * contain results if this scanner does not have enough partial results to form the complete
74 * result.
75 */
76 protected final LinkedList<Result> partialResults = new LinkedList<Result>();
77 /**
78 * The row for which we are accumulating partial Results (i.e. the row of the Results stored
79 * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
80 * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
81 */
82 protected byte[] partialResultsRow = null;
83 /**
84 * The last cell from a not full Row which is added to cache
85 */
86 protected Cell lastCellLoadedToCache = null;
87 protected final int caching;
88 protected long lastNext;
89 // Keep lastResult returned successfully in case we have to reset scanner.
90 protected Result lastResult = null;
91 protected final long maxScannerResultSize;
92 private final ClusterConnection connection;
93 private final TableName tableName;
94 protected final int scannerTimeout;
95 protected boolean scanMetricsPublished = false;
96 protected RpcRetryingCaller<Result []> caller;
97 protected RpcControllerFactory rpcControllerFactory;
98 protected Configuration conf;
99 //The timeout on the primary. Applicable if there are multiple replicas for a region
100 //In that case, we will only wait for this much timeout on the primary before going
101 //to the replicas and trying the same scan. Note that the retries will still happen
102 //on each replica and the first successful results will be taken. A timeout of 0 is
103 //disallowed.
104 protected final int primaryOperationTimeout;
105 private int retries;
106 protected final ExecutorService pool;
107 private static MetaComparator metaComparator = new MetaComparator();
108
109 /**
110 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
111 * row maybe changed changed.
112 * @param conf The {@link Configuration} to use.
113 * @param scan {@link Scan} to use in this scanner
114 * @param tableName The table that we wish to scan
115 * @param connection Connection identifying the cluster
116 * @throws IOException
117 */
118 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
119 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
120 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
121 throws IOException {
122 if (LOG.isTraceEnabled()) {
123 LOG.trace("Scan table=" + tableName
124 + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
125 }
126 this.scan = scan;
127 this.tableName = tableName;
128 this.lastNext = System.currentTimeMillis();
129 this.connection = connection;
130 this.pool = pool;
131 this.primaryOperationTimeout = primaryOperationTimeout;
132 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
133 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
134 if (scan.getMaxResultSize() > 0) {
135 this.maxScannerResultSize = scan.getMaxResultSize();
136 } else {
137 this.maxScannerResultSize = conf.getLong(
138 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
139 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
140 }
141 this.scannerTimeout = HBaseConfiguration.getInt(conf,
142 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
143 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
144 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
145
146 // check if application wants to collect scan metrics
147 initScanMetrics(scan);
148
149 // Use the caching from the Scan. If not set, use the default cache setting for this table.
150 if (this.scan.getCaching() > 0) {
151 this.caching = this.scan.getCaching();
152 } else {
153 this.caching = conf.getInt(
154 HConstants.HBASE_CLIENT_SCANNER_CACHING,
155 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
156 }
157
158 this.caller = rpcFactory.<Result[]> newCaller();
159 this.rpcControllerFactory = controllerFactory;
160
161 this.conf = conf;
162 initializeScannerInConstruction();
163 }
164
165 protected void initializeScannerInConstruction() throws IOException{
166 // initialize the scanner
167 nextScanner(this.caching, false);
168 }
169
170 protected ClusterConnection getConnection() {
171 return this.connection;
172 }
173
174 /**
175 * @return Table name
176 * @deprecated As of release 0.96
177 * (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
178 * This will be removed in HBase 2.0.0. Use {@link #getTable()}.
179 */
180 @Deprecated
181 protected byte [] getTableName() {
182 return this.tableName.getName();
183 }
184
185 protected TableName getTable() {
186 return this.tableName;
187 }
188
189 protected int getRetries() {
190 return this.retries;
191 }
192
193 protected int getScannerTimeout() {
194 return this.scannerTimeout;
195 }
196
197 protected Configuration getConf() {
198 return this.conf;
199 }
200
201 protected Scan getScan() {
202 return scan;
203 }
204
205 protected ExecutorService getPool() {
206 return pool;
207 }
208
209 protected int getPrimaryOperationTimeout() {
210 return primaryOperationTimeout;
211 }
212
213 protected int getCaching() {
214 return caching;
215 }
216
217 protected long getTimestamp() {
218 return lastNext;
219 }
220
221 @VisibleForTesting
222 protected long getMaxResultSize() {
223 return maxScannerResultSize;
224 }
225
226 // returns true if the passed region endKey
227 protected boolean checkScanStopRow(final byte [] endKey) {
228 if (this.scan.getStopRow().length > 0) {
229 // there is a stop row, check to see if we are past it.
230 byte [] stopRow = scan.getStopRow();
231 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
232 endKey, 0, endKey.length);
233 if (cmp <= 0) {
234 // stopRow <= endKey (endKey is equals to or larger than stopRow)
235 // This is a stop.
236 return true;
237 }
238 }
239 return false; //unlikely.
240 }
241
242 private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
243 // If we have just switched replica, don't go to the next scanner yet. Rather, try
244 // the scanner operations on the new replica, from the right point in the scan
245 // Note that when we switched to a different replica we left it at a point
246 // where we just did the "openScanner" with the appropriate startrow
247 if (callable != null && callable.switchedToADifferentReplica()) return true;
248 return nextScanner(nbRows, done);
249 }
250
251 /*
252 * Gets a scanner for the next region. If this.currentRegion != null, then
253 * we will move to the endrow of this.currentRegion. Else we will get
254 * scanner at the scan.getStartRow(). We will go no further, just tidy
255 * up outstanding scanners, if <code>currentRegion != null</code> and
256 * <code>done</code> is true.
257 * @param nbRows
258 * @param done Server-side says we're done scanning.
259 */
260 protected boolean nextScanner(int nbRows, final boolean done)
261 throws IOException {
262 // Close the previous scanner if it's open
263 if (this.callable != null) {
264 this.callable.setClose();
265 call(callable, caller, scannerTimeout);
266 this.callable = null;
267 }
268
269 // Where to start the next scanner
270 byte [] localStartKey;
271
272 // if we're at end of table, close and return false to stop iterating
273 if (this.currentRegion != null) {
274 byte [] endKey = this.currentRegion.getEndKey();
275 if (endKey == null ||
276 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
277 checkScanStopRow(endKey) ||
278 done) {
279 close();
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("Finished " + this.currentRegion);
282 }
283 return false;
284 }
285 localStartKey = endKey;
286 if (LOG.isTraceEnabled()) {
287 LOG.trace("Finished " + this.currentRegion);
288 }
289 } else {
290 localStartKey = this.scan.getStartRow();
291 }
292
293 if (LOG.isDebugEnabled() && this.currentRegion != null) {
294 // Only worth logging if NOT first region in scan.
295 LOG.debug("Advancing internal scanner to startKey at '" +
296 Bytes.toStringBinary(localStartKey) + "'");
297 }
298 try {
299 callable = getScannerCallable(localStartKey, nbRows);
300 // Open a scanner on the region server starting at the
301 // beginning of the region
302 call(callable, caller, scannerTimeout);
303 this.currentRegion = callable.getHRegionInfo();
304 if (this.scanMetrics != null) {
305 this.scanMetrics.countOfRegions.incrementAndGet();
306 }
307 } catch (IOException e) {
308 close();
309 throw e;
310 }
311 return true;
312 }
313
314 @VisibleForTesting
315 boolean isAnyRPCcancelled() {
316 return callable.isAnyRPCcancelled();
317 }
318
319 Result[] call(ScannerCallableWithReplicas callable,
320 RpcRetryingCaller<Result[]> caller, int scannerTimeout)
321 throws IOException, RuntimeException {
322 if (Thread.interrupted()) {
323 throw new InterruptedIOException();
324 }
325 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
326 // we do a callWithRetries
327 return caller.callWithoutRetries(callable, scannerTimeout);
328 }
329
330 @InterfaceAudience.Private
331 protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
332 int nbRows) {
333 scan.setStartRow(localStartKey);
334 ScannerCallable s =
335 new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
336 this.rpcControllerFactory);
337 s.setCaching(nbRows);
338 ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
339 s, pool, primaryOperationTimeout, scan,
340 retries, scannerTimeout, caching, conf, caller);
341 return sr;
342 }
343
344 /**
345 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
346 * application or TableInputFormat.Later, we could push it to other systems. We don't use
347 * metrics framework because it doesn't support multi-instances of the same metrics on the same
348 * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
349 *
350 * By default, scan metrics are disabled; if the application wants to collect them, this
351 * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
352 *
353 * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
354 */
355 protected void writeScanMetrics() {
356 if (this.scanMetrics == null || scanMetricsPublished) {
357 return;
358 }
359 MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
360 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
361 scanMetricsPublished = true;
362 }
363
364 @Override
365 public Result next() throws IOException {
366 // If the scanner is closed and there's nothing left in the cache, next is a no-op.
367 if (cache.size() == 0 && this.closed) {
368 return null;
369 }
370 if (cache.size() == 0) {
371 loadCache();
372 }
373
374 if (cache.size() > 0) {
375 return cache.poll();
376 }
377
378 // if we exhausted this scanner before calling close, write out the scan metrics
379 writeScanMetrics();
380 return null;
381 }
382
383 @VisibleForTesting
384 public int getCacheSize() {
385 return cache != null ? cache.size() : 0;
386 }
387
388 /**
389 * Contact the servers to load more {@link Result}s in the cache.
390 */
391 protected void loadCache() throws IOException {
392 Result[] values = null;
393 long remainingResultSize = maxScannerResultSize;
394 int countdown = this.caching;
395 // We need to reset it if it's a new callable that was created with a countdown in nextScanner
396 callable.setCaching(this.caching);
397 // This flag is set when we want to skip the result returned. We do
398 // this when we reset scanner because it split under us.
399 boolean retryAfterOutOfOrderException = true;
400 // We don't expect that the server will have more results for us if
401 // it doesn't tell us otherwise. We rely on the size or count of results
402 boolean serverHasMoreResults = false;
403 boolean allResultsSkipped = false;
404 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
405 // make sure that we are not retrying indefinitely.
406 int retriesLeft = getRetries();
407 do {
408 allResultsSkipped = false;
409 try {
410 // Server returns a null values if scanning is to stop. Else,
411 // returns an empty array if scanning is to go on and we've just
412 // exhausted current region.
413 values = call(callable, caller, scannerTimeout);
414 // When the replica switch happens, we need to do certain operations again.
415 // The callable will openScanner with the right startkey but we need to pick up
416 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
417 // of the loop as it happens for the cases where we see exceptions.
418 // Since only openScanner would have happened, values would be null
419 if (values == null && callable.switchedToADifferentReplica()) {
420 // Any accumulated partial results are no longer valid since the callable will
421 // openScanner with the correct startkey and we must pick up from there
422 clearPartialResults();
423 this.currentRegion = callable.getHRegionInfo();
424 continue;
425 }
426 retryAfterOutOfOrderException = true;
427 } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
428 // An exception was thrown which makes any partial results that we were collecting
429 // invalid. The scanner will need to be reset to the beginning of a row.
430 clearPartialResults();
431
432 // Unfortunately, DNRIOE is used in two different semantics.
433 // (1) The first is to close the client scanner and bubble up the exception all the way
434 // to the application. This is preferred when the exception is really un-recoverable
435 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
436 // bucket usually.
437 // (2) Second semantics is to close the current region scanner only, but continue the
438 // client scanner by overriding the exception. This is usually UnknownScannerException,
439 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
440 // application-level ClientScanner has to continue without bubbling up the exception to
441 // the client. See RSRpcServices to see how it throws DNRIOE's.
442 // See also: HBASE-16604, HBASE-17187
443
444 // If exception is any but the list below throw it back to the client; else setup
445 // the scanner and retry.
446 Throwable cause = e.getCause();
447 if ((cause != null && cause instanceof NotServingRegionException) ||
448 (cause != null && cause instanceof RegionServerStoppedException) ||
449 e instanceof OutOfOrderScannerNextException ||
450 e instanceof UnknownScannerException ||
451 e instanceof ScannerResetException) {
452 // Pass. It is easier writing the if loop test as list of what is allowed rather than
453 // as a list of what is not allowed... so if in here, it means we do not throw.
454 if (retriesLeft-- <= 0) {
455 throw e; // no more retries
456 }
457 } else {
458 throw e;
459 }
460
461 // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
462 if (this.lastResult != null) {
463 // The region has moved. We need to open a brand new scanner at the new location.
464 // Reset the startRow to the row we've seen last so that the new scanner starts at
465 // the correct row. Otherwise we may see previously returned rows again.
466 // (ScannerCallable by now has "relocated" the correct region)
467 if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
468 if (scan.isReversed()) {
469 scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
470 } else {
471 scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
472 }
473 } else {
474 // we need rescan this row because we only loaded partial row before
475 scan.setStartRow(lastResult.getRow());
476 }
477 }
478 if (e instanceof OutOfOrderScannerNextException) {
479 if (retryAfterOutOfOrderException) {
480 retryAfterOutOfOrderException = false;
481 } else {
482 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
483 throw new DoNotRetryIOException("Failed after retry of " +
484 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
485 }
486 }
487 // Clear region.
488 this.currentRegion = null;
489 // Set this to zero so we don't try and do an rpc and close on remote server when
490 // the exception we got was UnknownScanner or the Server is going down.
491 callable = null;
492 // This continue will take us to while at end of loop where we will set up new scanner.
493 continue;
494 }
495 long currentTime = System.currentTimeMillis();
496 if (this.scanMetrics != null) {
497 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
498 }
499 lastNext = currentTime;
500 // Groom the array of Results that we received back from the server before adding that
501 // Results to the scanner's cache. If partial results are not allowed to be seen by the
502 // caller, all book keeping will be performed within this method.
503 List<Result> resultsToAddToCache =
504 getResultsToAddToCache(values, callable.isHeartbeatMessage());
505 if (!resultsToAddToCache.isEmpty()) {
506 for (Result rs : resultsToAddToCache) {
507 rs = filterLoadedCell(rs);
508 if (rs == null) {
509 continue;
510 }
511 cache.add(rs);
512 for (Cell cell : rs.rawCells()) {
513 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
514 }
515 countdown--;
516 this.lastResult = rs;
517 if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
518 updateLastCellLoadedToCache(this.lastResult);
519 } else {
520 this.lastCellLoadedToCache = null;
521 }
522 }
523 if (cache.isEmpty()) {
524 // all result has been seen before, we need scan more.
525 allResultsSkipped = true;
526 continue;
527 }
528 }
529 if (callable.isHeartbeatMessage()) {
530 if (cache.size() > 0) {
531 // Caller of this method just wants a Result. If we see a heartbeat message, it means
532 // processing of the scan is taking a long time server side. Rather than continue to
533 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
534 // unnecesary delays to the caller
535 if (LOG.isTraceEnabled()) {
536 LOG.trace("Heartbeat message received and cache contains Results."
537 + " Breaking out of scan loop");
538 }
539 break;
540 }
541 continue;
542 }
543
544 // We expect that the server won't have more results for us when we exhaust
545 // the size (bytes or count) of the results returned. If the server *does* inform us that
546 // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
547 // get results is the moreResults context valid.
548 if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
549 // Only adhere to more server results when we don't have any partialResults
550 // as it keeps the outer loop logic the same.
551 serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
552 }
553 // Values == null means server-side filter has determined we must STOP
554 // !partialResults.isEmpty() means that we are still accumulating partial Results for a
555 // row. We should not change scanners before we receive all the partial Results for that
556 // row.
557 } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
558 || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
559 && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
560 }
561
562 /**
563 * @param remainingResultSize
564 * @param remainingRows
565 * @param regionHasMoreResults
566 * @return true when the current region has been exhausted. When the current region has been
567 * exhausted, the region must be changed before scanning can continue
568 */
569 private boolean doneWithRegion(long remainingResultSize, int remainingRows,
570 boolean regionHasMoreResults) {
571 return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
572 }
573
574 /**
575 * This method ensures all of our book keeping regarding partial results is kept up to date. This
576 * method should be called once we know that the results we received back from the RPC request do
577 * not contain errors. We return a list of results that should be added to the cache. In general,
578 * this list will contain all NON-partial results from the input array (unless the client has
579 * specified that they are okay with receiving partial results)
580 * @param resultsFromServer The array of {@link Result}s returned from the server
581 * @param heartbeatMessage Flag indicating whether or not the response received from the server
582 * represented a complete response, or a heartbeat message that was sent to keep the
583 * client-server connection alive
584 * @return the list of results that should be added to the cache.
585 * @throws IOException
586 */
587 protected List<Result>
588 getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
589 throws IOException {
590 int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
591 List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
592
593 final boolean isBatchSet = scan != null && scan.getBatch() > 0;
594 final boolean allowPartials = scan != null && scan.getAllowPartialResults();
595
596 // If the caller has indicated in their scan that they are okay with seeing partial results,
597 // then simply add all results to the list. Note that since scan batching also returns results
598 // for a row in pieces we treat batch being set as equivalent to allowing partials. The
599 // implication of treating batching as equivalent to partial results is that it is possible
600 // the caller will receive a result back where the number of cells in the result is less than
601 // the batch size even though it may not be the last group of cells for that row.
602 if (allowPartials || isBatchSet) {
603 addResultsToList(resultsToAddToCache, resultsFromServer, 0,
604 (null == resultsFromServer ? 0 : resultsFromServer.length));
605 return resultsToAddToCache;
606 }
607
608 // If no results were returned it indicates that either we have the all the partial results
609 // necessary to construct the complete result or the server had to send a heartbeat message
610 // to the client to keep the client-server connection alive
611 if (resultsFromServer == null || resultsFromServer.length == 0) {
612 // If this response was an empty heartbeat message, then we have not exhausted the region
613 // and thus there may be more partials server side that still need to be added to the partial
614 // list before we form the complete Result
615 if (!partialResults.isEmpty() && !heartbeatMessage) {
616 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
617 clearPartialResults();
618 }
619
620 return resultsToAddToCache;
621 }
622
623 // In every RPC response there should be at most a single partial result. Furthermore, if
624 // there is a partial result, it is guaranteed to be in the last position of the array.
625 Result last = resultsFromServer[resultsFromServer.length - 1];
626 Result partial = last.isPartial() ? last : null;
627
628 if (LOG.isTraceEnabled()) {
629 StringBuilder sb = new StringBuilder();
630 sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
631 sb.append("partial != null: ").append(partial != null).append(",");
632 sb.append("number of partials so far: ").append(partialResults.size());
633 LOG.trace(sb.toString());
634 }
635
636 // There are three possibilities cases that can occur while handling partial results
637 //
638 // 1. (partial != null && partialResults.isEmpty())
639 // This is the first partial result that we have received. It should be added to
640 // the list of partialResults and await the next RPC request at which point another
641 // portion of the complete result will be received
642 //
643 // 2. !partialResults.isEmpty()
644 // Since our partialResults list is not empty it means that we have been accumulating partial
645 // Results for a particular row. We cannot form the complete/whole Result for that row until
646 // all partials for the row have been received. Thus we loop through all of the Results
647 // returned from the server and determine whether or not all partial Results for the row have
648 // been received. We know that we have received all of the partial Results for the row when:
649 // i) We notice a row change in the Results
650 // ii) We see a Result for the partial row that is NOT marked as a partial Result
651 //
652 // 3. (partial == null && partialResults.isEmpty())
653 // Business as usual. We are not accumulating partial results and there wasn't a partial result
654 // in the RPC response. This means that all of the results we received from the server are
655 // complete and can be added directly to the cache
656 if (partial != null && partialResults.isEmpty()) {
657 addToPartialResults(partial);
658
659 // Exclude the last result, it's a partial
660 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
661 } else if (!partialResults.isEmpty()) {
662 for (int i = 0; i < resultsFromServer.length; i++) {
663 Result result = resultsFromServer[i];
664
665 // This result is from the same row as the partial Results. Add it to the list of partials
666 // and check if it was the last partial Result for that row
667 if (Bytes.equals(partialResultsRow, result.getRow())) {
668 addToPartialResults(result);
669
670 // If the result is not a partial, it is a signal to us that it is the last Result we
671 // need to form the complete Result client-side
672 if (!result.isPartial()) {
673 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
674 clearPartialResults();
675 }
676 } else {
677 // The row of this result differs from the row of the partial results we have received so
678 // far. If our list of partials isn't empty, this is a signal to form the complete Result
679 // since the row has now changed
680 if (!partialResults.isEmpty()) {
681 resultsToAddToCache.add(Result.createCompleteResult(partialResults));
682 clearPartialResults();
683 }
684
685 // It's possible that in one response from the server we receive the final partial for
686 // one row and receive a partial for a different row. Thus, make sure that all Results
687 // are added to the proper list
688 if (result.isPartial()) {
689 addToPartialResults(result);
690 } else {
691 resultsToAddToCache.add(result);
692 }
693 }
694 }
695 } else { // partial == null && partialResults.isEmpty() -- business as usual
696 addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
697 }
698
699 return resultsToAddToCache;
700 }
701
702 /**
703 * A convenience method for adding a Result to our list of partials. This method ensure that only
704 * Results that belong to the same row as the other partials can be added to the list.
705 * @param result The result that we want to add to our list of partial Results
706 * @throws IOException
707 */
708 private void addToPartialResults(final Result result) throws IOException {
709 final byte[] row = result.getRow();
710 if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
711 throw new IOException("Partial result row does not match. All partial results must come "
712 + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
713 + Bytes.toString(row));
714 }
715 partialResultsRow = row;
716 partialResults.add(result);
717 }
718
719 /**
720 * Convenience method for clearing the list of partials and resetting the partialResultsRow.
721 */
722 private void clearPartialResults() {
723 partialResults.clear();
724 partialResultsRow = null;
725 }
726
727 /**
728 * Helper method for adding results between the indices [start, end) to the outputList
729 * @param outputList the list that results will be added to
730 * @param inputArray the array that results are taken from
731 * @param start beginning index (inclusive)
732 * @param end ending index (exclusive)
733 */
734 private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
735 if (inputArray == null || start < 0 || end > inputArray.length) return;
736
737 for (int i = start; i < end; i++) {
738 outputList.add(inputArray[i]);
739 }
740 }
741
742 @Override
743 public void close() {
744 if (!scanMetricsPublished) writeScanMetrics();
745 if (callable != null) {
746 callable.setClose();
747 try {
748 call(callable, caller, scannerTimeout);
749 } catch (UnknownScannerException e) {
750 // We used to catch this error, interpret, and rethrow. However, we
751 // have since decided that it's not nice for a scanner's close to
752 // throw exceptions. Chances are it was just due to lease time out.
753 } catch (IOException e) {
754 /* An exception other than UnknownScanner is unexpected. */
755 LOG.warn("scanner failed to close. Exception follows: " + e);
756 }
757 callable = null;
758 }
759 closed = true;
760 }
761
762 /**
763 * Create the closest row before the specified row
764 * @param row
765 * @return a new byte array which is the closest front row of the specified one
766 */
767 protected static byte[] createClosestRowBefore(byte[] row) {
768 if (row == null) {
769 throw new IllegalArgumentException("The passed row is empty");
770 }
771 if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
772 return MAX_BYTE_ARRAY;
773 }
774 if (row[row.length - 1] == 0) {
775 return Arrays.copyOf(row, row.length - 1);
776 } else {
777 byte[] closestFrontRow = Arrays.copyOf(row, row.length);
778 closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
779 closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
780 return closestFrontRow;
781 }
782 }
783
784 @Override
785 public boolean renewLease() {
786 if (callable != null) {
787 // do not return any rows, do not advance the scanner
788 callable.setRenew(true);
789 try {
790 this.caller.callWithoutRetries(callable, this.scannerTimeout);
791 } catch (Exception e) {
792 return false;
793 } finally {
794 callable.setRenew(false);
795 }
796 return true;
797 }
798 return false;
799 }
800
801 protected void updateLastCellLoadedToCache(Result result) {
802 if (result.rawCells().length == 0) {
803 return;
804 }
805 this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
806 }
807
808 /**
809 * Compare two Cells considering reversed scanner.
810 * ReversedScanner only reverses rows, not columns.
811 */
812 private int compare(Cell a, Cell b) {
813 int r = 0;
814 if (currentRegion != null && currentRegion.isMetaRegion()) {
815 r = metaComparator.compareRows(a, b);
816 } else {
817 r = CellComparator.compareRows(a, b);
818 }
819 if (r != 0) {
820 return this.scan.isReversed() ? -r : r;
821 }
822 return CellComparator.compareWithoutRow(a, b);
823 }
824
825 private Result filterLoadedCell(Result result) {
826 // we only filter result when last result is partial
827 // so lastCellLoadedToCache and result should have same row key.
828 // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
829 // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
830 if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
831 return result;
832 }
833 if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
834 // The first cell of this result is larger than the last cell of loadcache.
835 // If user do not allow partial result, it must be true.
836 return result;
837 }
838 if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
839 // The last cell of this result is smaller than the last cell of loadcache, skip all.
840 return null;
841 }
842
843 // The first one must not in filtered result, we start at the second.
844 int index = 1;
845 while (index < result.rawCells().length) {
846 if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
847 break;
848 }
849 index++;
850 }
851 Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
852 return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
853 }
854 }