001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashSet;
023import java.util.Set;
024import java.util.concurrent.Callable;
025import java.util.concurrent.CancellationException;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Future;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * This class has the logic for handling scanners for regions with and without replicas. 1. A scan
044 * is attempted on the default (primary) region, or a specific region. 2. The scanner sends all the
045 * RPCs to the default/specific region until it is done, or, there is a timeout on the
046 * default/specific region (a timeout of zero is disallowed). 3. If there is a timeout in (2) above,
047 * scanner(s) is opened on the non-default replica(s) only for Consistency.TIMELINE without specific
048 * replica id specified. 4. The results from the first successful scanner are taken, and it is
049 * stored which server returned the results. 5. The next RPCs are done on the above stored server
050 * until it is done or there is a timeout, in which case, the other replicas are queried (as in (3)
051 * above).
052 */
053@InterfaceAudience.Private
054class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
055  private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
056  volatile ScannerCallable currentScannerCallable;
057  AtomicBoolean replicaSwitched = new AtomicBoolean(false);
058  final ClusterConnection cConnection;
059  protected final ExecutorService pool;
060  protected final int timeBeforeReplicas;
061  private final Scan scan;
062  private final int retries;
063  private Result lastResult;
064  private final RpcRetryingCaller<Result[]> caller;
065  private final TableName tableName;
066  private Configuration conf;
067  private final int scannerTimeout;
068  private final int readRpcTimeout;
069  private Set<ScannerCallable> outstandingCallables = new HashSet<>();
070  private boolean someRPCcancelled = false; // required for testing purposes only
071  private int regionReplication = 0;
072
073  public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
074    ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
075    int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
076    RpcRetryingCaller<Result[]> caller) {
077    this.currentScannerCallable = baseCallable;
078    this.cConnection = cConnection;
079    this.pool = pool;
080    if (timeBeforeReplicas < 0) {
081      throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
082    }
083    this.timeBeforeReplicas = timeBeforeReplicas;
084    this.scan = scan;
085    this.retries = retries;
086    this.tableName = tableName;
087    this.conf = conf;
088    this.readRpcTimeout = readRpcTimeout;
089    this.scannerTimeout = scannerTimeout;
090    this.caller = caller;
091  }
092
093  public void setClose() {
094    if (currentScannerCallable != null) {
095      currentScannerCallable.setClose();
096    } else {
097      LOG.warn("Calling close on ScannerCallable reference that is already null, "
098        + "which shouldn't happen.");
099    }
100  }
101
102  public void setRenew(boolean val) {
103    currentScannerCallable.setRenew(val);
104  }
105
106  public void setCaching(int caching) {
107    currentScannerCallable.setCaching(caching);
108  }
109
110  public int getCaching() {
111    return currentScannerCallable.getCaching();
112  }
113
114  public HRegionInfo getHRegionInfo() {
115    return currentScannerCallable.getHRegionInfo();
116  }
117
118  public MoreResults moreResultsInRegion() {
119    return currentScannerCallable.moreResultsInRegion();
120  }
121
122  public MoreResults moreResultsForScan() {
123    return currentScannerCallable.moreResultsForScan();
124  }
125
126  @Override
127  public Result[] call(int timeout) throws IOException {
128    // If the active replica callable was closed somewhere, invoke the RPC to
129    // really close it. In the case of regular scanners, this applies. We make couple
130    // of RPCs to a RegionServer, and when that region is exhausted, we set
131    // the closed flag. Then an RPC is required to actually close the scanner.
132    if (currentScannerCallable != null && currentScannerCallable.closed) {
133      // For closing we target that exact scanner (and not do replica fallback like in
134      // the case of normal reads)
135      if (LOG.isTraceEnabled()) {
136        LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
137      }
138      Result[] r = currentScannerCallable.call(timeout);
139      currentScannerCallable = null;
140      return r;
141    } else if (currentScannerCallable == null) {
142      LOG.warn("Another call received, but our ScannerCallable is already null. "
143        + "This shouldn't happen, but there's not much to do, so logging and returning null.");
144      return null;
145    }
146    // We need to do the following:
147    // 1. When a scan goes out to a certain replica (default or not), we need to
148    // continue to hit that until there is a failure. So store the last successfully invoked
149    // replica
150    // 2. We should close the "losing" scanners (scanners other than the ones we hear back
151    // from first)
152    //
153    // Since RegionReplication is a table attribute, it wont change as long as table is enabled,
154    // it just needs to be set once.
155
156    if (regionReplication <= 0) {
157      RegionLocations rl = null;
158      try {
159        rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
160          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
161          currentScannerCallable.getRow());
162      } catch (RetriesExhaustedException | DoNotRetryIOException e) {
163        // We cannot get the primary replica region location, it is possible that the region server
164        // hosting meta table is down, it needs to proceed to try cached replicas directly.
165        if (cConnection instanceof ConnectionImplementation) {
166          rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName,
167            currentScannerCallable.getRow());
168          if (rl == null) {
169            throw e;
170          }
171        } else {
172          // For completeness
173          throw e;
174        }
175      }
176      regionReplication = rl.size();
177    }
178    // allocate a boundedcompletion pool of some multiple of number of replicas.
179    // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
180    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
181      new ResultBoundedCompletionService<>(
182        RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
183        regionReplication * 5);
184
185    AtomicBoolean done = new AtomicBoolean(false);
186    replicaSwitched.set(false);
187    // submit call for the primary replica or user specified replica
188    addCallsForCurrentReplica(cs);
189    int startIndex = 0;
190
191    try {
192      // wait for the timeout to see whether the primary responds back
193      Future<Pair<Result[], ScannerCallable>> f =
194        cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
195      if (f != null) {
196        // After poll, if f is not null, there must be a completed task
197        Pair<Result[], ScannerCallable> r = f.get();
198        if (r != null && r.getSecond() != null) {
199          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
200        }
201        return r == null ? null : r.getFirst(); // great we got a response
202      }
203    } catch (ExecutionException e) {
204      // We ignore the ExecutionException and continue with the replicas
205      if (LOG.isDebugEnabled()) {
206        LOG.debug("Scan with primary region returns " + e.getCause());
207      }
208
209      // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica,
210      // it needs to throw out the exception from the primary replica
211      if (
212        regionReplication == 1 || scan.getConsistency() == Consistency.STRONG
213          || scan.getReplicaId() >= 0
214      ) {
215        // Rethrow the first exception
216        RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
217      }
218      startIndex = 1;
219    } catch (CancellationException e) {
220      throw new InterruptedIOException(e.getMessage());
221    } catch (InterruptedException e) {
222      throw new InterruptedIOException(e.getMessage());
223    }
224
225    // submit call for the all of the secondaries at once
226    int endIndex = regionReplication;
227    if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) {
228      // When scan's consistency is strong or scan is over specific replica region, do not send to
229      // the secondaries
230      endIndex = 1;
231    } else {
232      // TODO: this may be an overkill for large region replication
233      addCallsForOtherReplicas(cs, 0, regionReplication - 1);
234    }
235
236    try {
237      Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
238        TimeUnit.MILLISECONDS, startIndex, endIndex);
239
240      if (f == null) {
241        throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms");
242      }
243      Pair<Result[], ScannerCallable> r = f.get();
244
245      if (r != null && r.getSecond() != null) {
246        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
247      }
248      return r == null ? null : r.getFirst(); // great we got an answer
249
250    } catch (ExecutionException e) {
251      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
252    } catch (CancellationException e) {
253      throw new InterruptedIOException(e.getMessage());
254    } catch (InterruptedException e) {
255      throw new InterruptedIOException(e.getMessage());
256    } finally {
257      // We get there because we were interrupted or because one or more of the
258      // calls succeeded or failed. In all case, we stop all our tasks.
259      cs.cancelAll();
260    }
261    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
262    throw new IOException("Imposible? Arrive at an unreachable line...");
263  }
264
265  @SuppressWarnings("FutureReturnValueIgnored")
266  private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
267    AtomicBoolean done, ExecutorService pool) {
268    if (done.compareAndSet(false, true)) {
269      if (currentScannerCallable != scanner) replicaSwitched.set(true);
270      currentScannerCallable = scanner;
271      // store where to start the replica scanner from if we need to.
272      if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
273      if (LOG.isTraceEnabled()) {
274        LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId
275          + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
276      }
277      // close all outstanding replica scanners but the one we heard back from
278      outstandingCallables.remove(scanner);
279      for (ScannerCallable s : outstandingCallables) {
280        if (LOG.isTraceEnabled()) {
281          LOG.trace("Closing scanner id=" + s.scannerId + ", replica="
282            + s.getHRegionInfo().getRegionId() + " because slow and replica="
283            + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
284        }
285        // Submit the "close" to the pool since this might take time, and we don't
286        // want to wait for the "close" to happen yet. The "wait" will happen when
287        // the table is closed (when the awaitTermination of the underlying pool is called)
288        s.setClose();
289        final RetryingRPC r = new RetryingRPC(s);
290        pool.submit(new Callable<Void>() {
291          @Override
292          public Void call() throws Exception {
293            r.call(scannerTimeout);
294            return null;
295          }
296        });
297      }
298      // now clear outstandingCallables since we scheduled a close for all the contained scanners
299      outstandingCallables.clear();
300    }
301  }
302
303  /**
304   * When a scanner switches in the middle of scanning (the 'next' call fails for example), the
305   * upper layer {@link ClientScanner} needs to know
306   */
307  public boolean switchedToADifferentReplica() {
308    return replicaSwitched.get();
309  }
310
311  /**
312   * Returns true when the most recent RPC response indicated that the response was a heartbeat
313   * message. Heartbeat messages are sent back from the server when the processing of the scan
314   * request exceeds a certain time threshold. Heartbeats allow the server to avoid timeouts during
315   * long running scan operations.
316   */
317  public boolean isHeartbeatMessage() {
318    return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
319  }
320
321  public Cursor getCursor() {
322    return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
323  }
324
325  private void
326    addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
327    RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
328    outstandingCallables.add(currentScannerCallable);
329    cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
330  }
331
332  private void addCallsForOtherReplicas(
333    ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
334
335    for (int id = min; id <= max; id++) {
336      if (currentScannerCallable.id == id) {
337        continue; // this was already scheduled earlier
338      }
339      ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
340      setStartRowForReplicaCallable(s);
341      outstandingCallables.add(s);
342      RetryingRPC retryingOnReplica = new RetryingRPC(s);
343      cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
344    }
345  }
346
347  /**
348   * Set the start row for the replica callable based on the state of the last result received.
349   * @param callable The callable to set the start row on
350   */
351  private void setStartRowForReplicaCallable(ScannerCallable callable) {
352    if (this.lastResult == null || callable == null) {
353      return;
354    }
355    // 1. The last result was a partial result which means we have not received all of the cells
356    // for this row. Thus, use the last result's row as the start row. If a replica switch
357    // occurs, the scanner will ensure that any accumulated partial results are cleared,
358    // and the scan can resume from this row.
359    // 2. The last result was not a partial result which means it contained all of the cells for
360    // that row (we no longer need any information from it). Set the start row to the next
361    // closest row that could be seen.
362    callable.getScan().withStartRow(this.lastResult.getRow(),
363      this.lastResult.mayHaveMoreCellsInRow());
364  }
365
366  boolean isAnyRPCcancelled() {
367    return someRPCcancelled;
368  }
369
370  class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
371    final ScannerCallable callable;
372    RpcRetryingCaller<Result[]> caller;
373    private volatile boolean cancelled = false;
374
375    RetryingRPC(ScannerCallable callable) {
376      this.callable = callable;
377      // For the Consistency.STRONG (default case), we reuse the caller
378      // to keep compatibility with what is done in the past
379      // For the Consistency.TIMELINE case, we can't reuse the caller
380      // since we could be making parallel RPCs (caller.callWithRetries is synchronized
381      // and we can't invoke it multiple times at the same time)
382      this.caller = ScannerCallableWithReplicas.this.caller;
383      if (scan.getConsistency() == Consistency.TIMELINE) {
384        this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).<
385          Result[]> newCaller();
386      }
387    }
388
389    @Override
390    public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
391      // since the retries is done within the ResultBoundedCompletionService,
392      // we don't invoke callWithRetries here
393      if (cancelled) {
394        return null;
395      }
396      Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
397      return new Pair<>(res, this.callable);
398    }
399
400    @Override
401    public void prepare(boolean reload) throws IOException {
402      if (cancelled) return;
403
404      if (Thread.interrupted()) {
405        throw new InterruptedIOException();
406      }
407
408      callable.prepare(reload);
409    }
410
411    @Override
412    public void throwable(Throwable t, boolean retrying) {
413      callable.throwable(t, retrying);
414    }
415
416    @Override
417    public String getExceptionMessageAdditionalDetail() {
418      return callable.getExceptionMessageAdditionalDetail();
419    }
420
421    @Override
422    public long sleep(long pause, int tries) {
423      return callable.sleep(pause, tries);
424    }
425
426    @Override
427    public void cancel() {
428      cancelled = true;
429      caller.cancel();
430      if (callable.getRpcController() != null) {
431        callable.getRpcController().startCancel();
432      }
433      someRPCcancelled = true;
434    }
435
436    @Override
437    public boolean isCancelled() {
438      return cancelled;
439    }
440  }
441
442  @Override
443  public void prepare(boolean reload) throws IOException {
444  }
445
446  @Override
447  public void throwable(Throwable t, boolean retrying) {
448    currentScannerCallable.throwable(t, retrying);
449  }
450
451  @Override
452  public String getExceptionMessageAdditionalDetail() {
453    return currentScannerCallable.getExceptionMessageAdditionalDetail();
454  }
455
456  @Override
457  public long sleep(long pause, int tries) {
458    return currentScannerCallable.sleep(pause, tries);
459  }
460}