001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.client;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.HashSet;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CancellationException;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.HRegionInfo;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
044import org.apache.hadoop.hbase.util.Pair;
045
046/**
047 * This class has the logic for handling scanners for regions with and without replicas.
048 * 1. A scan is attempted on the default (primary) region
049 * 2. The scanner sends all the RPCs to the default region until it is done, or, there
050 * is a timeout on the default (a timeout of zero is disallowed).
051 * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
052 * 4. The results from the first successful scanner are taken, and it is stored which server
053 * returned the results.
054 * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
055 * in which case, the other replicas are queried (as in (3) above).
056 *
057 */
058@InterfaceAudience.Private
059class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
060  private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
061  volatile ScannerCallable currentScannerCallable;
062  AtomicBoolean replicaSwitched = new AtomicBoolean(false);
063  final ClusterConnection cConnection;
064  protected final ExecutorService pool;
065  protected final int timeBeforeReplicas;
066  private final Scan scan;
067  private final int retries;
068  private Result lastResult;
069  private final RpcRetryingCaller<Result[]> caller;
070  private final TableName tableName;
071  private Configuration conf;
072  private int scannerTimeout;
073  private Set<ScannerCallable> outstandingCallables = new HashSet<>();
074  private boolean someRPCcancelled = false; //required for testing purposes only
075  private int regionReplication = 0;
076
077  public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
078      ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
079      int retries, int scannerTimeout, int caching, Configuration conf,
080      RpcRetryingCaller<Result []> caller) {
081    this.currentScannerCallable = baseCallable;
082    this.cConnection = cConnection;
083    this.pool = pool;
084    if (timeBeforeReplicas < 0) {
085      throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
086    }
087    this.timeBeforeReplicas = timeBeforeReplicas;
088    this.scan = scan;
089    this.retries = retries;
090    this.tableName = tableName;
091    this.conf = conf;
092    this.scannerTimeout = scannerTimeout;
093    this.caller = caller;
094  }
095
096  public void setClose() {
097    currentScannerCallable.setClose();
098  }
099
100  public void setRenew(boolean val) {
101    currentScannerCallable.setRenew(val);
102  }
103
104  public void setCaching(int caching) {
105    currentScannerCallable.setCaching(caching);
106  }
107
108  public int getCaching() {
109    return currentScannerCallable.getCaching();
110  }
111
112  public HRegionInfo getHRegionInfo() {
113    return currentScannerCallable.getHRegionInfo();
114  }
115
116  public MoreResults moreResultsInRegion() {
117    return currentScannerCallable.moreResultsInRegion();
118  }
119
120  public MoreResults moreResultsForScan() {
121    return currentScannerCallable.moreResultsForScan();
122  }
123
124  @Override
125  public Result [] call(int timeout) throws IOException {
126    // If the active replica callable was closed somewhere, invoke the RPC to
127    // really close it. In the case of regular scanners, this applies. We make couple
128    // of RPCs to a RegionServer, and when that region is exhausted, we set
129    // the closed flag. Then an RPC is required to actually close the scanner.
130    if (currentScannerCallable != null && currentScannerCallable.closed) {
131      // For closing we target that exact scanner (and not do replica fallback like in
132      // the case of normal reads)
133      if (LOG.isTraceEnabled()) {
134        LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
135      }
136      Result[] r = currentScannerCallable.call(timeout);
137      currentScannerCallable = null;
138      return r;
139    }
140    // We need to do the following:
141    //1. When a scan goes out to a certain replica (default or not), we need to
142    //   continue to hit that until there is a failure. So store the last successfully invoked
143    //   replica
144    //2. We should close the "losing" scanners (scanners other than the ones we hear back
145    //   from first)
146    //
147    // Since RegionReplication is a table attribute, it wont change as long as table is enabled,
148    // it just needs to be set once.
149
150    if (regionReplication <= 0) {
151      RegionLocations rl = null;
152      try {
153        rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
154            RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
155            currentScannerCallable.getRow());
156      } catch (RetriesExhaustedException | DoNotRetryIOException e) {
157        // We cannot get the primary replica region location, it is possible that the region server
158        // hosting meta table is down, it needs to proceed to try cached replicas directly.
159        if (cConnection instanceof ConnectionImplementation) {
160          rl = ((ConnectionImplementation) cConnection)
161              .getCachedLocation(tableName, currentScannerCallable.getRow());
162          if (rl == null) {
163            throw e;
164          }
165        } else {
166          // For completeness
167          throw e;
168        }
169      }
170      regionReplication = rl.size();
171    }
172    // allocate a boundedcompletion pool of some multiple of number of replicas.
173    // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
174    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
175        new ResultBoundedCompletionService<>(
176            RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
177            regionReplication * 5);
178
179    AtomicBoolean done = new AtomicBoolean(false);
180    replicaSwitched.set(false);
181    // submit call for the primary replica.
182    addCallsForCurrentReplica(cs);
183    int startIndex = 0;
184
185    try {
186      // wait for the timeout to see whether the primary responds back
187      Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
188          TimeUnit.MICROSECONDS); // Yes, microseconds
189      if (f != null) {
190        // After poll, if f is not null, there must be a completed task
191        Pair<Result[], ScannerCallable> r = f.get();
192        if (r != null && r.getSecond() != null) {
193          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
194        }
195        return r == null ? null : r.getFirst(); //great we got a response
196      }
197    } catch (ExecutionException e) {
198      // We ignore the ExecutionException and continue with the replicas
199      if (LOG.isDebugEnabled()) {
200        LOG.debug("Scan with primary region returns " + e.getCause());
201      }
202
203      // If rl's size is 1 or scan's consitency is strong, it needs to throw
204      // out the exception from the primary replica
205      if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) {
206        // Rethrow the first exception
207        RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
208      }
209
210      startIndex = 1;
211    } catch (CancellationException e) {
212      throw new InterruptedIOException(e.getMessage());
213    } catch (InterruptedException e) {
214      throw new InterruptedIOException(e.getMessage());
215    }
216
217    // submit call for the all of the secondaries at once
218    int endIndex = regionReplication;
219    if (scan.getConsistency() == Consistency.STRONG) {
220      // When scan's consistency is strong, do not send to the secondaries
221      endIndex = 1;
222    } else {
223      // TODO: this may be an overkill for large region replication
224      addCallsForOtherReplicas(cs, 0, regionReplication - 1);
225    }
226
227    try {
228      Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
229          TimeUnit.MILLISECONDS, startIndex, endIndex);
230
231      if (f == null) {
232        throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
233      }
234      Pair<Result[], ScannerCallable> r = f.get();
235
236      if (r != null && r.getSecond() != null) {
237        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
238      }
239      return r == null ? null : r.getFirst(); // great we got an answer
240
241    } catch (ExecutionException e) {
242      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
243    } catch (CancellationException e) {
244      throw new InterruptedIOException(e.getMessage());
245    } catch (InterruptedException e) {
246      throw new InterruptedIOException(e.getMessage());
247    } finally {
248      // We get there because we were interrupted or because one or more of the
249      // calls succeeded or failed. In all case, we stop all our tasks.
250      cs.cancelAll();
251    }
252    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
253    throw new IOException("Imposible? Arrive at an unreachable line...");
254  }
255
256  private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
257      AtomicBoolean done, ExecutorService pool) {
258    if (done.compareAndSet(false, true)) {
259      if (currentScannerCallable != scanner) replicaSwitched.set(true);
260      currentScannerCallable = scanner;
261      // store where to start the replica scanner from if we need to.
262      if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
263      if (LOG.isTraceEnabled()) {
264        LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
265            " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
266      }
267      // close all outstanding replica scanners but the one we heard back from
268      outstandingCallables.remove(scanner);
269      for (ScannerCallable s : outstandingCallables) {
270        if (LOG.isTraceEnabled()) {
271          LOG.trace("Closing scanner id=" + s.scannerId +
272            ", replica=" + s.getHRegionInfo().getRegionId() +
273            " because slow and replica=" +
274            this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
275        }
276        // Submit the "close" to the pool since this might take time, and we don't
277        // want to wait for the "close" to happen yet. The "wait" will happen when
278        // the table is closed (when the awaitTermination of the underlying pool is called)
279        s.setClose();
280        final RetryingRPC r = new RetryingRPC(s);
281        pool.submit(new Callable<Void>(){
282          @Override
283          public Void call() throws Exception {
284            r.call(scannerTimeout);
285            return null;
286          }
287        });
288      }
289      // now clear outstandingCallables since we scheduled a close for all the contained scanners
290      outstandingCallables.clear();
291    }
292  }
293
294  /**
295   * When a scanner switches in the middle of scanning (the 'next' call fails
296   * for example), the upper layer {@link ClientScanner} needs to know
297   */
298  public boolean switchedToADifferentReplica() {
299    return replicaSwitched.get();
300  }
301
302  /**
303   * @return true when the most recent RPC response indicated that the response was a heartbeat
304   *         message. Heartbeat messages are sent back from the server when the processing of the
305   *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
306   *         timeouts during long running scan operations.
307   */
308  public boolean isHeartbeatMessage() {
309    return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
310  }
311
312  public Cursor getCursor() {
313    return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
314  }
315
316  private void addCallsForCurrentReplica(
317      ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
318    RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
319    outstandingCallables.add(currentScannerCallable);
320    cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
321  }
322
323  private void addCallsForOtherReplicas(
324      ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
325
326    for (int id = min; id <= max; id++) {
327      if (currentScannerCallable.id == id) {
328        continue; //this was already scheduled earlier
329      }
330      ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
331      setStartRowForReplicaCallable(s);
332      outstandingCallables.add(s);
333      RetryingRPC retryingOnReplica = new RetryingRPC(s);
334      cs.submit(retryingOnReplica, scannerTimeout, id);
335    }
336  }
337
338  /**
339   * Set the start row for the replica callable based on the state of the last result received.
340   * @param callable The callable to set the start row on
341   */
342  private void setStartRowForReplicaCallable(ScannerCallable callable) {
343    if (this.lastResult == null || callable == null) {
344      return;
345    }
346    // 1. The last result was a partial result which means we have not received all of the cells
347    // for this row. Thus, use the last result's row as the start row. If a replica switch
348    // occurs, the scanner will ensure that any accumulated partial results are cleared,
349    // and the scan can resume from this row.
350    // 2. The last result was not a partial result which means it contained all of the cells for
351    // that row (we no longer need any information from it). Set the start row to the next
352    // closest row that could be seen.
353    callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow());
354  }
355
356  @VisibleForTesting
357  boolean isAnyRPCcancelled() {
358    return someRPCcancelled;
359  }
360
361  class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
362    final ScannerCallable callable;
363    RpcRetryingCaller<Result[]> caller;
364    private volatile boolean cancelled = false;
365
366    RetryingRPC(ScannerCallable callable) {
367      this.callable = callable;
368      // For the Consistency.STRONG (default case), we reuse the caller
369      // to keep compatibility with what is done in the past
370      // For the Consistency.TIMELINE case, we can't reuse the caller
371      // since we could be making parallel RPCs (caller.callWithRetries is synchronized
372      // and we can't invoke it multiple times at the same time)
373      this.caller = ScannerCallableWithReplicas.this.caller;
374      if (scan.getConsistency() == Consistency.TIMELINE) {
375        this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
376            .<Result[]>newCaller();
377      }
378    }
379
380    @Override
381    public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
382      // since the retries is done within the ResultBoundedCompletionService,
383      // we don't invoke callWithRetries here
384      if (cancelled) {
385        return null;
386      }
387      Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
388      return new Pair<>(res, this.callable);
389    }
390
391    @Override
392    public void prepare(boolean reload) throws IOException {
393      if (cancelled) return;
394
395      if (Thread.interrupted()) {
396        throw new InterruptedIOException();
397      }
398
399      callable.prepare(reload);
400    }
401
402    @Override
403    public void throwable(Throwable t, boolean retrying) {
404      callable.throwable(t, retrying);
405    }
406
407    @Override
408    public String getExceptionMessageAdditionalDetail() {
409      return callable.getExceptionMessageAdditionalDetail();
410    }
411
412    @Override
413    public long sleep(long pause, int tries) {
414      return callable.sleep(pause, tries);
415    }
416
417    @Override
418    public void cancel() {
419      cancelled = true;
420      caller.cancel();
421      if (callable.getRpcController() != null) {
422        callable.getRpcController().startCancel();
423      }
424      someRPCcancelled = true;
425    }
426
427    @Override
428    public boolean isCancelled() {
429      return cancelled;
430    }
431  }
432
433  @Override
434  public void prepare(boolean reload) throws IOException {
435  }
436
437  @Override
438  public void throwable(Throwable t, boolean retrying) {
439    currentScannerCallable.throwable(t, retrying);
440  }
441
442  @Override
443  public String getExceptionMessageAdditionalDetail() {
444    return currentScannerCallable.getExceptionMessageAdditionalDetail();
445  }
446
447  @Override
448  public long sleep(long pause, int tries) {
449    return currentScannerCallable.sleep(pause, tries);
450  }
451}