View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.CancellationException;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.RegionLocations;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Pair;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * This class has the logic for handling scanners for regions with and without replicas.
51   * 1. A scan is attempted on the default (primary) region
52   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
53   * is a timeout on the default (a timeout of zero is disallowed).
54   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
55   * 4. The results from the first successful scanner are taken, and it is stored which server
56   * returned the results.
57   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
58   * in which case, the other replicas are queried (as in (3) above).
59   *
60   */
61  @InterfaceAudience.Private
62  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
63    private final Log LOG = LogFactory.getLog(this.getClass());
64    volatile ScannerCallable currentScannerCallable;
65    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
66    final ClusterConnection cConnection;
67    protected final ExecutorService pool;
68    protected final int timeBeforeReplicas;
69    private final Scan scan;
70    private final int retries;
71    private Result lastResult;
72    private final RpcRetryingCaller<Result[]> caller;
73    private final TableName tableName;
74    private Configuration conf;
75    private int scannerTimeout;
76    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
77    private boolean someRPCcancelled = false; //required for testing purposes only
78  
79    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
80        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
81        int retries, int scannerTimeout, int caching, Configuration conf,
82        RpcRetryingCaller<Result []> caller) {
83      this.currentScannerCallable = baseCallable;
84      this.cConnection = cConnection;
85      this.pool = pool;
86      if (timeBeforeReplicas < 0) {
87        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
88      }
89      this.timeBeforeReplicas = timeBeforeReplicas;
90      this.scan = scan;
91      this.retries = retries;
92      this.tableName = tableName;
93      this.conf = conf;
94      this.scannerTimeout = scannerTimeout;
95      this.caller = caller;
96    }
97  
98    public void setClose() {
99      currentScannerCallable.setClose();
100   }
101 
102   public void setCaching(int caching) {
103     currentScannerCallable.setCaching(caching);
104   }
105 
106   public int getCaching() {
107     return currentScannerCallable.getCaching();
108   }
109 
110   public HRegionInfo getHRegionInfo() {
111     return currentScannerCallable.getHRegionInfo();
112   }
113 
114   public boolean getServerHasMoreResults() {
115     return currentScannerCallable.getServerHasMoreResults();
116   }
117 
118   public void setServerHasMoreResults(boolean serverHasMoreResults) {
119     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
120   }
121 
122   public boolean hasMoreResultsContext() {
123     return currentScannerCallable.hasMoreResultsContext();
124   }
125 
126   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
127     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
128   }
129 
130   @Override
131   public Result [] call(int timeout) throws IOException {
132     // If the active replica callable was closed somewhere, invoke the RPC to
133     // really close it. In the case of regular scanners, this applies. We make couple
134     // of RPCs to a RegionServer, and when that region is exhausted, we set
135     // the closed flag. Then an RPC is required to actually close the scanner.
136     if (currentScannerCallable != null && currentScannerCallable.closed) {
137       // For closing we target that exact scanner (and not do replica fallback like in
138       // the case of normal reads)
139       if (LOG.isTraceEnabled()) {
140         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
141       }
142       Result[] r = currentScannerCallable.call(timeout);
143       currentScannerCallable = null;
144       return r;
145     }
146     // We need to do the following:
147     //1. When a scan goes out to a certain replica (default or not), we need to
148     //   continue to hit that until there is a failure. So store the last successfully invoked
149     //   replica
150     //2. We should close the "losing" scanners (scanners other than the ones we hear back
151     //   from first)
152     //
153     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
154         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
155         currentScannerCallable.getRow());
156 
157     // allocate a boundedcompletion pool of some multiple of number of replicas.
158     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
159     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
160         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
161             new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
162             rl.size() * 5);
163 
164     List<ExecutionException> exceptions = null;
165     int submitted = 0, completed = 0;
166     AtomicBoolean done = new AtomicBoolean(false);
167     replicaSwitched.set(false);
168     // submit call for the primary replica.
169     submitted += addCallsForCurrentReplica(cs, rl);
170     try {
171       // wait for the timeout to see whether the primary responds back
172       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
173           TimeUnit.MICROSECONDS); // Yes, microseconds
174       if (f != null) {
175         Pair<Result[], ScannerCallable> r = f.get();
176         if (r != null && r.getSecond() != null) {
177           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
178         }
179         return r == null ? null : r.getFirst(); //great we got a response
180       }
181     } catch (ExecutionException e) {
182       // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
183       // but the secondaries might still succeed. Continue on the replica RPCs.
184       exceptions = new ArrayList<ExecutionException>(rl.size());
185       exceptions.add(e);
186       completed++;
187     } catch (CancellationException e) {
188       throw new InterruptedIOException(e.getMessage());
189     } catch (InterruptedException e) {
190       throw new InterruptedIOException(e.getMessage());
191     }
192     // submit call for the all of the secondaries at once
193     // TODO: this may be an overkill for large region replication
194     submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
195     try {
196       while (completed < submitted) {
197         try {
198           Future<Pair<Result[], ScannerCallable>> f = cs.take();
199           Pair<Result[], ScannerCallable> r = f.get();
200           if (r != null && r.getSecond() != null) {
201             updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
202           }
203           return r == null ? null : r.getFirst(); // great we got an answer
204         } catch (ExecutionException e) {
205           // if not cancel or interrupt, wait until all RPC's are done
206           // one of the tasks failed. Save the exception for later.
207           if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
208           exceptions.add(e);
209           completed++;
210         }
211       }
212     } catch (CancellationException e) {
213       throw new InterruptedIOException(e.getMessage());
214     } catch (InterruptedException e) {
215       throw new InterruptedIOException(e.getMessage());
216     } finally {
217       // We get there because we were interrupted or because one or more of the
218       // calls succeeded or failed. In all case, we stop all our tasks.
219       cs.cancelAll();
220     }
221 
222     if (exceptions != null && !exceptions.isEmpty()) {
223       RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
224           retries); // just rethrow the first exception for now.
225     }
226     return null; // unreachable
227   }
228 
229   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
230       AtomicBoolean done, ExecutorService pool) {
231     if (done.compareAndSet(false, true)) {
232       if (currentScannerCallable != scanner) replicaSwitched.set(true);
233       currentScannerCallable = scanner;
234       // store where to start the replica scanner from if we need to.
235       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
236       if (LOG.isTraceEnabled()) {
237         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
238             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
239       }
240       // close all outstanding replica scanners but the one we heard back from
241       outstandingCallables.remove(scanner);
242       for (ScannerCallable s : outstandingCallables) {
243         if (LOG.isTraceEnabled()) {
244           LOG.trace("Closing scanner id=" + s.scannerId +
245             ", replica=" + s.getHRegionInfo().getRegionId() +
246             " because slow and replica=" +
247             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
248         }
249         // Submit the "close" to the pool since this might take time, and we don't
250         // want to wait for the "close" to happen yet. The "wait" will happen when
251         // the table is closed (when the awaitTermination of the underlying pool is called)
252         s.setClose();
253         final RetryingRPC r = new RetryingRPC(s);
254         pool.submit(new Callable<Void>(){
255           @Override
256           public Void call() throws Exception {
257             r.call(scannerTimeout);
258             return null;
259           }
260         });
261       }
262       // now clear outstandingCallables since we scheduled a close for all the contained scanners
263       outstandingCallables.clear();
264     }
265   }
266 
267   /**
268    * When a scanner switches in the middle of scanning (the 'next' call fails
269    * for example), the upper layer {@link ClientScanner} needs to know
270    * @return
271    */
272   public boolean switchedToADifferentReplica() {
273     return replicaSwitched.get();
274   }
275 
276   /**
277    * @return true when the most recent RPC response indicated that the response was a heartbeat
278    *         message. Heartbeat messages are sent back from the server when the processing of the
279    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
280    *         timeouts during long running scan operations.
281    */
282   public boolean isHeartbeatMessage() {
283     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
284   }
285 
286   private int addCallsForCurrentReplica(
287       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
288     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
289     outstandingCallables.add(currentScannerCallable);
290     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
291     return 1;
292   }
293 
294   private int addCallsForOtherReplicas(
295       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
296       int min, int max) {
297     if (scan.getConsistency() == Consistency.STRONG) {
298       return 0; // not scheduling on other replicas for strong consistency
299     }
300     for (int id = min; id <= max; id++) {
301       if (currentScannerCallable.id == id) {
302         continue; //this was already scheduled earlier
303       }
304       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
305       setStartRowForReplicaCallable(s);
306       outstandingCallables.add(s);
307       RetryingRPC retryingOnReplica = new RetryingRPC(s);
308       cs.submit(retryingOnReplica, scannerTimeout, id);
309     }
310     return max - min + 1;
311   }
312 
313   /**
314    * Set the start row for the replica callable based on the state of the last result received.
315    * @param callable The callable to set the start row on
316    */
317   private void setStartRowForReplicaCallable(ScannerCallable callable) {
318     if (this.lastResult == null || callable == null) return;
319 
320     if (this.lastResult.isPartial()) {
321       // The last result was a partial result which means we have not received all of the cells
322       // for this row. Thus, use the last result's row as the start row. If a replica switch
323       // occurs, the scanner will ensure that any accumulated partial results are cleared,
324       // and the scan can resume from this row.
325       callable.getScan().setStartRow(this.lastResult.getRow());
326     } else {
327       // The last result was not a partial result which means it contained all of the cells for
328       // that row (we no longer need any information from it). Set the start row to the next
329       // closest row that could be seen.
330       if (callable.getScan().isReversed()) {
331         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
332       } else {
333         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
334       }
335     }
336   }
337 
338   @VisibleForTesting
339   boolean isAnyRPCcancelled() {
340     return someRPCcancelled;
341   }
342 
343   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
344     final ScannerCallable callable;
345     RpcRetryingCaller<Result[]> caller;
346     private volatile boolean cancelled = false;
347 
348     RetryingRPC(ScannerCallable callable) {
349       this.callable = callable;
350       // For the Consistency.STRONG (default case), we reuse the caller
351       // to keep compatibility with what is done in the past
352       // For the Consistency.TIMELINE case, we can't reuse the caller
353       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
354       // and we can't invoke it multiple times at the same time)
355       this.caller = ScannerCallableWithReplicas.this.caller;
356       if (scan.getConsistency() == Consistency.TIMELINE) {
357         this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
358             <Result[]>newCaller();
359       }
360     }
361 
362     @Override
363     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
364       // since the retries is done within the ResultBoundedCompletionService,
365       // we don't invoke callWithRetries here
366       if (cancelled) {
367         return null;
368       }
369       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
370       return new Pair<Result[], ScannerCallable>(res, this.callable);
371     }
372 
373     @Override
374     public void prepare(boolean reload) throws IOException {
375       if (cancelled) return;
376 
377       if (Thread.interrupted()) {
378         throw new InterruptedIOException();
379       }
380 
381       callable.prepare(reload);
382     }
383 
384     @Override
385     public void throwable(Throwable t, boolean retrying) {
386       callable.throwable(t, retrying);
387     }
388 
389     @Override
390     public String getExceptionMessageAdditionalDetail() {
391       return callable.getExceptionMessageAdditionalDetail();
392     }
393 
394     @Override
395     public long sleep(long pause, int tries) {
396       return callable.sleep(pause, tries);
397     }
398 
399     @Override
400     public void cancel() {
401       cancelled = true;
402       caller.cancel();
403       if (callable.getController() != null) {
404         callable.getController().startCancel();
405       }
406       someRPCcancelled = true;
407     }
408 
409     @Override
410     public boolean isCancelled() {
411       return cancelled;
412     }
413   }
414 
415   @Override
416   public void prepare(boolean reload) throws IOException {
417   }
418 
419   @Override
420   public void throwable(Throwable t, boolean retrying) {
421     currentScannerCallable.throwable(t, retrying);
422   }
423 
424   @Override
425   public String getExceptionMessageAdditionalDetail() {
426     return currentScannerCallable.getExceptionMessageAdditionalDetail();
427   }
428 
429   @Override
430   public long sleep(long pause, int tries) {
431     return currentScannerCallable.sleep(pause, tries);
432   }
433 }