View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.RegionLocations;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  
46  import com.google.common.annotations.VisibleForTesting;
47  
48  /**
49   * This class has the logic for handling scanners for regions with and without replicas.
50   * 1. A scan is attempted on the default (primary) region
51   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
52   * is a timeout on the default (a timeout of zero is disallowed).
53   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
54   * 4. The results from the first successful scanner are taken, and it is stored which server
55   * returned the results.
56   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
57   * in which case, the other replicas are queried (as in (3) above).
58   *
59   */
60  @InterfaceAudience.Private
61  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
62    private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
63    volatile ScannerCallable currentScannerCallable;
64    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
65    final ClusterConnection cConnection;
66    protected final ExecutorService pool;
67    protected final int timeBeforeReplicas;
68    private final Scan scan;
69    private final int retries;
70    private Result lastResult;
71    private final RpcRetryingCaller<Result[]> caller;
72    private final TableName tableName;
73    private Configuration conf;
74    private int scannerTimeout;
75    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
76    private boolean someRPCcancelled = false; //required for testing purposes only
77  
78    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
79        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
80        int retries, int scannerTimeout, int caching, Configuration conf,
81        RpcRetryingCaller<Result []> caller) {
82      this.currentScannerCallable = baseCallable;
83      this.cConnection = cConnection;
84      this.pool = pool;
85      if (timeBeforeReplicas < 0) {
86        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
87      }
88      this.timeBeforeReplicas = timeBeforeReplicas;
89      this.scan = scan;
90      this.retries = retries;
91      this.tableName = tableName;
92      this.conf = conf;
93      this.scannerTimeout = scannerTimeout;
94      this.caller = caller;
95    }
96  
97    public void setClose() {
98      currentScannerCallable.setClose();
99    }
100 
101   public void setRenew(boolean val) {
102     currentScannerCallable.setRenew(val);
103   }
104 
105   public void setCaching(int caching) {
106     currentScannerCallable.setCaching(caching);
107   }
108 
109   public int getCaching() {
110     return currentScannerCallable.getCaching();
111   }
112 
113   public HRegionInfo getHRegionInfo() {
114     return currentScannerCallable.getHRegionInfo();
115   }
116 
117   public boolean getServerHasMoreResults() {
118     return currentScannerCallable.getServerHasMoreResults();
119   }
120 
121   public void setServerHasMoreResults(boolean serverHasMoreResults) {
122     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
123   }
124 
125   public boolean hasMoreResultsContext() {
126     return currentScannerCallable.hasMoreResultsContext();
127   }
128 
129   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
130     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
131   }
132 
133   @Override
134   public Result [] call(int timeout) throws IOException {
135     // If the active replica callable was closed somewhere, invoke the RPC to
136     // really close it. In the case of regular scanners, this applies. We make couple
137     // of RPCs to a RegionServer, and when that region is exhausted, we set
138     // the closed flag. Then an RPC is required to actually close the scanner.
139     if (currentScannerCallable != null && currentScannerCallable.closed) {
140       // For closing we target that exact scanner (and not do replica fallback like in
141       // the case of normal reads)
142       if (LOG.isTraceEnabled()) {
143         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
144       }
145       Result[] r = currentScannerCallable.call(timeout);
146       currentScannerCallable = null;
147       return r;
148     }
149     // We need to do the following:
150     //1. When a scan goes out to a certain replica (default or not), we need to
151     //   continue to hit that until there is a failure. So store the last successfully invoked
152     //   replica
153     //2. We should close the "losing" scanners (scanners other than the ones we hear back
154     //   from first)
155     //
156     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
157         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
158         currentScannerCallable.getRow());
159 
160     // allocate a boundedcompletion pool of some multiple of number of replicas.
161     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
162     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
163         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
164             RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
165             rl.size() * 5);
166 
167     AtomicBoolean done = new AtomicBoolean(false);
168     replicaSwitched.set(false);
169     // submit call for the primary replica.
170     addCallsForCurrentReplica(cs, rl);
171 
172     try {
173       // wait for the timeout to see whether the primary responds back
174       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
175           TimeUnit.MICROSECONDS); // Yes, microseconds
176       if (f != null) {
177         Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
178         if (r != null && r.getSecond() != null) {
179           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
180         }
181         return r == null ? null : r.getFirst(); //great we got a response
182       }
183     } catch (ExecutionException e) {
184       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
185     } catch (CancellationException e) {
186       throw new InterruptedIOException(e.getMessage());
187     } catch (InterruptedException e) {
188       throw new InterruptedIOException(e.getMessage());
189     } catch (TimeoutException e) {
190       throw new InterruptedIOException(e.getMessage());
191     }
192 
193     // submit call for the all of the secondaries at once
194     // TODO: this may be an overkill for large region replication
195     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
196 
197     try {
198       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
199       if (f != null) {
200         Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
201         if (r != null && r.getSecond() != null) {
202           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
203         }
204         return r == null ? null : r.getFirst(); // great we got an answer
205       } else {
206         throw new IOException("Failed to get result within timeout, timeout="
207             + timeout + "ms");
208       }
209     } catch (ExecutionException e) {
210       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
211     } catch (CancellationException e) {
212       throw new InterruptedIOException(e.getMessage());
213     } catch (InterruptedException e) {
214       throw new InterruptedIOException(e.getMessage());
215     } catch (TimeoutException e) {
216       throw new InterruptedIOException(e.getMessage());
217     } finally {
218       // We get there because we were interrupted or because one or more of the
219       // calls succeeded or failed. In all case, we stop all our tasks.
220       cs.cancelAll();
221     }
222     LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
223     throw new IOException("Imposible? Arrive at an unreachable line...");
224   }
225 
226   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
227       AtomicBoolean done, ExecutorService pool) {
228     if (done.compareAndSet(false, true)) {
229       if (currentScannerCallable != scanner) replicaSwitched.set(true);
230       currentScannerCallable = scanner;
231       // store where to start the replica scanner from if we need to.
232       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
233       if (LOG.isTraceEnabled()) {
234         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
235             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
236       }
237       // close all outstanding replica scanners but the one we heard back from
238       outstandingCallables.remove(scanner);
239       for (ScannerCallable s : outstandingCallables) {
240         if (LOG.isTraceEnabled()) {
241           LOG.trace("Closing scanner id=" + s.scannerId +
242             ", replica=" + s.getHRegionInfo().getRegionId() +
243             " because slow and replica=" +
244             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
245         }
246         // Submit the "close" to the pool since this might take time, and we don't
247         // want to wait for the "close" to happen yet. The "wait" will happen when
248         // the table is closed (when the awaitTermination of the underlying pool is called)
249         s.setClose();
250         final RetryingRPC r = new RetryingRPC(s);
251         pool.submit(new Callable<Void>(){
252           @Override
253           public Void call() throws Exception {
254             r.call(scannerTimeout);
255             return null;
256           }
257         });
258       }
259       // now clear outstandingCallables since we scheduled a close for all the contained scanners
260       outstandingCallables.clear();
261     }
262   }
263 
264   /**
265    * When a scanner switches in the middle of scanning (the 'next' call fails
266    * for example), the upper layer {@link ClientScanner} needs to know
267    * @return
268    */
269   public boolean switchedToADifferentReplica() {
270     return replicaSwitched.get();
271   }
272 
273   /**
274    * @return true when the most recent RPC response indicated that the response was a heartbeat
275    *         message. Heartbeat messages are sent back from the server when the processing of the
276    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
277    *         timeouts during long running scan operations.
278    */
279   public boolean isHeartbeatMessage() {
280     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
281   }
282 
283   private void addCallsForCurrentReplica(
284       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
285     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
286     outstandingCallables.add(currentScannerCallable);
287     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
288   }
289 
290   private void addCallsForOtherReplicas(
291       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
292       int min, int max) {
293     if (scan.getConsistency() == Consistency.STRONG) {
294       return; // not scheduling on other replicas for strong consistency
295     }
296     for (int id = min; id <= max; id++) {
297       if (currentScannerCallable.id == id) {
298         continue; //this was already scheduled earlier
299       }
300       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
301       setStartRowForReplicaCallable(s);
302       outstandingCallables.add(s);
303       RetryingRPC retryingOnReplica = new RetryingRPC(s);
304       cs.submit(retryingOnReplica, scannerTimeout, id);
305     }
306   }
307 
308   /**
309    * Set the start row for the replica callable based on the state of the last result received.
310    * @param callable The callable to set the start row on
311    */
312   private void setStartRowForReplicaCallable(ScannerCallable callable) {
313     if (this.lastResult == null || callable == null) return;
314 
315     if (this.lastResult.isPartial()) {
316       // The last result was a partial result which means we have not received all of the cells
317       // for this row. Thus, use the last result's row as the start row. If a replica switch
318       // occurs, the scanner will ensure that any accumulated partial results are cleared,
319       // and the scan can resume from this row.
320       callable.getScan().setStartRow(this.lastResult.getRow());
321     } else {
322       // The last result was not a partial result which means it contained all of the cells for
323       // that row (we no longer need any information from it). Set the start row to the next
324       // closest row that could be seen.
325       if (callable.getScan().isReversed()) {
326         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
327       } else {
328         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
329       }
330     }
331   }
332 
333   @VisibleForTesting
334   boolean isAnyRPCcancelled() {
335     return someRPCcancelled;
336   }
337 
338   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
339     final ScannerCallable callable;
340     RpcRetryingCaller<Result[]> caller;
341     private volatile boolean cancelled = false;
342 
343     RetryingRPC(ScannerCallable callable) {
344       this.callable = callable;
345       // For the Consistency.STRONG (default case), we reuse the caller
346       // to keep compatibility with what is done in the past
347       // For the Consistency.TIMELINE case, we can't reuse the caller
348       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
349       // and we can't invoke it multiple times at the same time)
350       this.caller = ScannerCallableWithReplicas.this.caller;
351       if (scan.getConsistency() == Consistency.TIMELINE) {
352         this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
353             .<Result[]>newCaller();
354       }
355     }
356 
357     @Override
358     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
359       // since the retries is done within the ResultBoundedCompletionService,
360       // we don't invoke callWithRetries here
361       if (cancelled) {
362         return null;
363       }
364       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
365       return new Pair<Result[], ScannerCallable>(res, this.callable);
366     }
367 
368     @Override
369     public void prepare(boolean reload) throws IOException {
370       if (cancelled) return;
371 
372       if (Thread.interrupted()) {
373         throw new InterruptedIOException();
374       }
375 
376       callable.prepare(reload);
377     }
378 
379     @Override
380     public void throwable(Throwable t, boolean retrying) {
381       callable.throwable(t, retrying);
382     }
383 
384     @Override
385     public String getExceptionMessageAdditionalDetail() {
386       return callable.getExceptionMessageAdditionalDetail();
387     }
388 
389     @Override
390     public long sleep(long pause, int tries) {
391       return callable.sleep(pause, tries);
392     }
393 
394     @Override
395     public void cancel() {
396       cancelled = true;
397       caller.cancel();
398       if (callable.getController() != null) {
399         callable.getController().startCancel();
400       }
401       someRPCcancelled = true;
402     }
403 
404     @Override
405     public boolean isCancelled() {
406       return cancelled;
407     }
408   }
409 
410   @Override
411   public void prepare(boolean reload) throws IOException {
412   }
413 
414   @Override
415   public void throwable(Throwable t, boolean retrying) {
416     currentScannerCallable.throwable(t, retrying);
417   }
418 
419   @Override
420   public String getExceptionMessageAdditionalDetail() {
421     return currentScannerCallable.getExceptionMessageAdditionalDetail();
422   }
423 
424   @Override
425   public long sleep(long pause, int tries) {
426     return currentScannerCallable.sleep(pause, tries);
427   }
428 }