View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.RegionLocations;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45  import org.apache.hadoop.hbase.util.Pair;
46
47  import com.google.common.annotations.VisibleForTesting;
48
49  /**
50   * This class has the logic for handling scanners for regions with and without replicas.
51   * 1. A scan is attempted on the default (primary) region
52   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
53   * is a timeout on the default (a timeout of zero is disallowed).
54   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
55   * 4. The results from the first successful scanner are taken, and it is stored which server
56   * returned the results.
57   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
58   * in which case, the other replicas are queried (as in (3) above).
59   *
60   */
61  @InterfaceAudience.Private
62  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
63    private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
64    volatile ScannerCallable currentScannerCallable;
65    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
66    final ClusterConnection cConnection;
67    protected final ExecutorService pool;
68    protected final int timeBeforeReplicas;
69    private final Scan scan;
70    private final int retries;
71    private Result lastResult;
72    private final RpcRetryingCaller<Result[]> caller;
73    private final TableName tableName;
74    private Configuration conf;
75    private int scannerTimeout;
76    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
77    private boolean someRPCcancelled = false; //required for testing purposes only
78
79    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
80        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
81        int retries, int scannerTimeout, int caching, Configuration conf,
82        RpcRetryingCaller<Result []> caller) {
83      this.currentScannerCallable = baseCallable;
84      this.cConnection = cConnection;
85      this.pool = pool;
86      if (timeBeforeReplicas < 0) {
87        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
88      }
89      this.timeBeforeReplicas = timeBeforeReplicas;
90      this.scan = scan;
91      this.retries = retries;
92      this.tableName = tableName;
93      this.conf = conf;
94      this.scannerTimeout = scannerTimeout;
95      this.caller = caller;
96    }
97
98    public void setClose() {
99      currentScannerCallable.setClose();
100   }
101
102   public void setRenew(boolean val) {
103     currentScannerCallable.setRenew(val);
104   }
105
106   public void setCaching(int caching) {
107     currentScannerCallable.setCaching(caching);
108   }
109
110   public int getCaching() {
111     return currentScannerCallable.getCaching();
112   }
113
114   public HRegionInfo getHRegionInfo() {
115     return currentScannerCallable.getHRegionInfo();
116   }
117
118   public boolean getServerHasMoreResults() {
119     return currentScannerCallable.getServerHasMoreResults();
120   }
121
122   public void setServerHasMoreResults(boolean serverHasMoreResults) {
123     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
124   }
125
126   public boolean hasMoreResultsContext() {
127     return currentScannerCallable.hasMoreResultsContext();
128   }
129
130   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
131     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
132   }
133
134   @Override
135   public Result [] call(int timeout) throws IOException {
136     // If the active replica callable was closed somewhere, invoke the RPC to
137     // really close it. In the case of regular scanners, this applies. We make couple
138     // of RPCs to a RegionServer, and when that region is exhausted, we set
139     // the closed flag. Then an RPC is required to actually close the scanner.
140     if (currentScannerCallable != null && currentScannerCallable.closed) {
141       // For closing we target that exact scanner (and not do replica fallback like in
142       // the case of normal reads)
143       if (LOG.isTraceEnabled()) {
144         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
145       }
146       Result[] r = currentScannerCallable.call(timeout);
147       currentScannerCallable = null;
148       return r;
149     }
150     // We need to do the following:
151     //1. When a scan goes out to a certain replica (default or not), we need to
152     //   continue to hit that until there is a failure. So store the last successfully invoked
153     //   replica
154     //2. We should close the "losing" scanners (scanners other than the ones we hear back
155     //   from first)
156     //
157     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
158         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
159         currentScannerCallable.getRow());
160
161     // allocate a boundedcompletion pool of some multiple of number of replicas.
162     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
163     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
164         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
165             RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
166             rl.size() * 5);
167
168     AtomicBoolean done = new AtomicBoolean(false);
169     replicaSwitched.set(false);
170     // submit call for the primary replica.
171     addCallsForCurrentReplica(cs, rl);
172
173     try {
174       // wait for the timeout to see whether the primary responds back
175       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
176           TimeUnit.MICROSECONDS); // Yes, microseconds
177       if (f != null) {
178         Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
179         if (r != null && r.getSecond() != null) {
180           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
181         }
182         return r == null ? null : r.getFirst(); //great we got a response
183       }
184     } catch (ExecutionException e) {
185       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
186     } catch (CancellationException e) {
187       throw new InterruptedIOException(e.getMessage());
188     } catch (InterruptedException e) {
189       throw new InterruptedIOException(e.getMessage());
190     } catch (TimeoutException e) {
191       throw new InterruptedIOException(e.getMessage());
192     }
193
194     // submit call for the all of the secondaries at once
195     // TODO: this may be an overkill for large region replication
196     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
197
198     try {
199       long start = EnvironmentEdgeManager.currentTime();
200       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
201       long duration = EnvironmentEdgeManager.currentTime() - start;
202       if (f != null) {
203         Pair<Result[], ScannerCallable> r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
204         if (r != null && r.getSecond() != null) {
205           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
206         }
207         return r == null ? null : r.getFirst(); // great we got an answer
208       } else {
209         throw new IOException("Failed to get result within timeout, timeout="
210             + timeout + "ms");
211       }
212     } catch (ExecutionException e) {
213       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
214     } catch (CancellationException e) {
215       throw new InterruptedIOException(e.getMessage());
216     } catch (InterruptedException e) {
217       throw new InterruptedIOException(e.getMessage());
218     } catch (TimeoutException e) {
219       throw new InterruptedIOException(e.getMessage());
220     } finally {
221       // We get there because we were interrupted or because one or more of the
222       // calls succeeded or failed. In all case, we stop all our tasks.
223       cs.cancelAll();
224     }
225     LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
226     throw new IOException("Imposible? Arrive at an unreachable line...");
227   }
228
229   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
230       AtomicBoolean done, ExecutorService pool) {
231     if (done.compareAndSet(false, true)) {
232       if (currentScannerCallable != scanner) replicaSwitched.set(true);
233       currentScannerCallable = scanner;
234       // store where to start the replica scanner from if we need to.
235       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
236       if (LOG.isTraceEnabled()) {
237         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
238             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
239       }
240       // close all outstanding replica scanners but the one we heard back from
241       outstandingCallables.remove(scanner);
242       for (ScannerCallable s : outstandingCallables) {
243         if (LOG.isTraceEnabled()) {
244           LOG.trace("Closing scanner id=" + s.scannerId +
245             ", replica=" + s.getHRegionInfo().getRegionId() +
246             " because slow and replica=" +
247             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
248         }
249         // Submit the "close" to the pool since this might take time, and we don't
250         // want to wait for the "close" to happen yet. The "wait" will happen when
251         // the table is closed (when the awaitTermination of the underlying pool is called)
252         s.setClose();
253         final RetryingRPC r = new RetryingRPC(s);
254         pool.submit(new Callable<Void>(){
255           @Override
256           public Void call() throws Exception {
257             r.call(scannerTimeout);
258             return null;
259           }
260         });
261       }
262       // now clear outstandingCallables since we scheduled a close for all the contained scanners
263       outstandingCallables.clear();
264     }
265   }
266
267   /**
268    * When a scanner switches in the middle of scanning (the 'next' call fails
269    * for example), the upper layer {@link ClientScanner} needs to know
270    */
271   public boolean switchedToADifferentReplica() {
272     return replicaSwitched.get();
273   }
274
275   /**
276    * @return true when the most recent RPC response indicated that the response was a heartbeat
277    *         message. Heartbeat messages are sent back from the server when the processing of the
278    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
279    *         timeouts during long running scan operations.
280    */
281   public boolean isHeartbeatMessage() {
282     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
283   }
284
285   private void addCallsForCurrentReplica(
286       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
287     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
288     outstandingCallables.add(currentScannerCallable);
289     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
290   }
291
292   private void addCallsForOtherReplicas(
293       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
294       int min, int max) {
295     if (scan.getConsistency() == Consistency.STRONG) {
296       return; // not scheduling on other replicas for strong consistency
297     }
298     for (int id = min; id <= max; id++) {
299       if (currentScannerCallable.id == id) {
300         continue; //this was already scheduled earlier
301       }
302       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
303       setStartRowForReplicaCallable(s);
304       outstandingCallables.add(s);
305       RetryingRPC retryingOnReplica = new RetryingRPC(s);
306       cs.submit(retryingOnReplica, scannerTimeout, id);
307     }
308   }
309
310   /**
311    * Set the start row for the replica callable based on the state of the last result received.
312    * @param callable The callable to set the start row on
313    */
314   private void setStartRowForReplicaCallable(ScannerCallable callable) {
315     if (this.lastResult == null || callable == null) return;
316
317     if (this.lastResult.isPartial()) {
318       // The last result was a partial result which means we have not received all of the cells
319       // for this row. Thus, use the last result's row as the start row. If a replica switch
320       // occurs, the scanner will ensure that any accumulated partial results are cleared,
321       // and the scan can resume from this row.
322       callable.getScan().setStartRow(this.lastResult.getRow());
323     } else {
324       // The last result was not a partial result which means it contained all of the cells for
325       // that row (we no longer need any information from it). Set the start row to the next
326       // closest row that could be seen.
327       if (callable.getScan().isReversed()) {
328         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
329       } else {
330         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
331       }
332     }
333   }
334
335   @VisibleForTesting
336   boolean isAnyRPCcancelled() {
337     return someRPCcancelled;
338   }
339
340   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
341     final ScannerCallable callable;
342     RpcRetryingCaller<Result[]> caller;
343     private volatile boolean cancelled = false;
344
345     RetryingRPC(ScannerCallable callable) {
346       this.callable = callable;
347       // For the Consistency.STRONG (default case), we reuse the caller
348       // to keep compatibility with what is done in the past
349       // For the Consistency.TIMELINE case, we can't reuse the caller
350       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
351       // and we can't invoke it multiple times at the same time)
352       this.caller = ScannerCallableWithReplicas.this.caller;
353       if (scan.getConsistency() == Consistency.TIMELINE) {
354         this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
355             .<Result[]>newCaller();
356       }
357     }
358
359     @Override
360     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
361       // since the retries is done within the ResultBoundedCompletionService,
362       // we don't invoke callWithRetries here
363       if (cancelled) {
364         return null;
365       }
366       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
367       return new Pair<Result[], ScannerCallable>(res, this.callable);
368     }
369
370     @Override
371     public void prepare(boolean reload) throws IOException {
372       if (cancelled) return;
373
374       if (Thread.interrupted()) {
375         throw new InterruptedIOException();
376       }
377
378       callable.prepare(reload);
379     }
380
381     @Override
382     public void throwable(Throwable t, boolean retrying) {
383       callable.throwable(t, retrying);
384     }
385
386     @Override
387     public String getExceptionMessageAdditionalDetail() {
388       return callable.getExceptionMessageAdditionalDetail();
389     }
390
391     @Override
392     public long sleep(long pause, int tries) {
393       return callable.sleep(pause, tries);
394     }
395
396     @Override
397     public void cancel() {
398       cancelled = true;
399       caller.cancel();
400       if (callable.getRpcController() != null) {
401         callable.getRpcController().startCancel();
402       }
403       someRPCcancelled = true;
404     }
405
406     @Override
407     public boolean isCancelled() {
408       return cancelled;
409     }
410   }
411
412   @Override
413   public void prepare(boolean reload) throws IOException {
414   }
415
416   @Override
417   public void throwable(Throwable t, boolean retrying) {
418     currentScannerCallable.throwable(t, retrying);
419   }
420
421   @Override
422   public String getExceptionMessageAdditionalDetail() {
423     return currentScannerCallable.getExceptionMessageAdditionalDetail();
424   }
425
426   @Override
427   public long sleep(long pause, int tries) {
428     return currentScannerCallable.sleep(pause, tries);
429   }
430 }