View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.RegionLocations;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.Pair;
44  
45  import com.google.common.annotations.VisibleForTesting;
46  
47  /**
48   * This class has the logic for handling scanners for regions with and without replicas.
49   * 1. A scan is attempted on the default (primary) region
50   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
51   * is a timeout on the default (a timeout of zero is disallowed).
52   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
53   * 4. The results from the first successful scanner are taken, and it is stored which server
54   * returned the results.
55   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
56   * in which case, the other replicas are queried (as in (3) above).
57   *
58   */
59  @InterfaceAudience.Private
60  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
61    private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
62    volatile ScannerCallable currentScannerCallable;
63    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
64    final ClusterConnection cConnection;
65    protected final ExecutorService pool;
66    protected final int timeBeforeReplicas;
67    private final Scan scan;
68    private final int retries;
69    private Result lastResult;
70    private final RpcRetryingCaller<Result[]> caller;
71    private final TableName tableName;
72    private Configuration conf;
73    private int scannerTimeout;
74    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
75    private boolean someRPCcancelled = false; //required for testing purposes only
76  
77    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
78        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
79        int retries, int scannerTimeout, int caching, Configuration conf,
80        RpcRetryingCaller<Result []> caller) {
81      this.currentScannerCallable = baseCallable;
82      this.cConnection = cConnection;
83      this.pool = pool;
84      if (timeBeforeReplicas < 0) {
85        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
86      }
87      this.timeBeforeReplicas = timeBeforeReplicas;
88      this.scan = scan;
89      this.retries = retries;
90      this.tableName = tableName;
91      this.conf = conf;
92      this.scannerTimeout = scannerTimeout;
93      this.caller = caller;
94    }
95  
96    public void setClose() {
97      currentScannerCallable.setClose();
98    }
99  
100   public void setCaching(int caching) {
101     currentScannerCallable.setCaching(caching);
102   }
103 
104   public int getCaching() {
105     return currentScannerCallable.getCaching();
106   }
107 
108   public HRegionInfo getHRegionInfo() {
109     return currentScannerCallable.getHRegionInfo();
110   }
111 
112   public boolean getServerHasMoreResults() {
113     return currentScannerCallable.getServerHasMoreResults();
114   }
115 
116   public void setServerHasMoreResults(boolean serverHasMoreResults) {
117     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
118   }
119 
120   public boolean hasMoreResultsContext() {
121     return currentScannerCallable.hasMoreResultsContext();
122   }
123 
124   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
125     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
126   }
127 
128   @Override
129   public Result [] call(int timeout) throws IOException {
130     // If the active replica callable was closed somewhere, invoke the RPC to
131     // really close it. In the case of regular scanners, this applies. We make couple
132     // of RPCs to a RegionServer, and when that region is exhausted, we set
133     // the closed flag. Then an RPC is required to actually close the scanner.
134     if (currentScannerCallable != null && currentScannerCallable.closed) {
135       // For closing we target that exact scanner (and not do replica fallback like in
136       // the case of normal reads)
137       if (LOG.isTraceEnabled()) {
138         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
139       }
140       Result[] r = currentScannerCallable.call(timeout);
141       currentScannerCallable = null;
142       return r;
143     }
144     // We need to do the following:
145     //1. When a scan goes out to a certain replica (default or not), we need to
146     //   continue to hit that until there is a failure. So store the last successfully invoked
147     //   replica
148     //2. We should close the "losing" scanners (scanners other than the ones we hear back
149     //   from first)
150     //
151     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
152         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
153         currentScannerCallable.getRow());
154 
155     // allocate a boundedcompletion pool of some multiple of number of replicas.
156     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
157     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
158         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
159             RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
160             rl.size() * 5);
161 
162     AtomicBoolean done = new AtomicBoolean(false);
163     replicaSwitched.set(false);
164     // submit call for the primary replica.
165     addCallsForCurrentReplica(cs, rl);
166     try {
167       // wait for the timeout to see whether the primary responds back
168       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
169           TimeUnit.MICROSECONDS); // Yes, microseconds
170       if (f != null) {
171         Pair<Result[], ScannerCallable> r = f.get();
172         if (r != null && r.getSecond() != null) {
173           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
174         }
175         return r == null ? null : r.getFirst(); //great we got a response
176       }
177     } catch (ExecutionException e) {
178       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
179     } catch (CancellationException e) {
180       throw new InterruptedIOException(e.getMessage());
181     } catch (InterruptedException e) {
182       throw new InterruptedIOException(e.getMessage());
183     }
184     // submit call for the all of the secondaries at once
185     // TODO: this may be an overkill for large region replication
186     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
187     try {
188       Future<Pair<Result[], ScannerCallable>> f = cs.take();
189       Pair<Result[], ScannerCallable> r = f.get();
190       if (r != null && r.getSecond() != null) {
191         updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
192       }
193       return r == null ? null : r.getFirst(); // great we got an answer
194     } catch (ExecutionException e) {
195       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
196     } catch (CancellationException e) {
197       throw new InterruptedIOException(e.getMessage());
198     } catch (InterruptedException e) {
199       throw new InterruptedIOException(e.getMessage());
200     } finally {
201       // We get there because we were interrupted or because one or more of the
202       // calls succeeded or failed. In all case, we stop all our tasks.
203       cs.cancelAll();
204     }
205     return null; // unreachable
206   }
207 
208   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
209       AtomicBoolean done, ExecutorService pool) {
210     if (done.compareAndSet(false, true)) {
211       if (currentScannerCallable != scanner) replicaSwitched.set(true);
212       currentScannerCallable = scanner;
213       // store where to start the replica scanner from if we need to.
214       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
215       if (LOG.isTraceEnabled()) {
216         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
217             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
218       }
219       // close all outstanding replica scanners but the one we heard back from
220       outstandingCallables.remove(scanner);
221       for (ScannerCallable s : outstandingCallables) {
222         if (LOG.isTraceEnabled()) {
223           LOG.trace("Closing scanner id=" + s.scannerId +
224             ", replica=" + s.getHRegionInfo().getRegionId() +
225             " because slow and replica=" +
226             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
227         }
228         // Submit the "close" to the pool since this might take time, and we don't
229         // want to wait for the "close" to happen yet. The "wait" will happen when
230         // the table is closed (when the awaitTermination of the underlying pool is called)
231         s.setClose();
232         final RetryingRPC r = new RetryingRPC(s);
233         pool.submit(new Callable<Void>(){
234           @Override
235           public Void call() throws Exception {
236             r.call(scannerTimeout);
237             return null;
238           }
239         });
240       }
241       // now clear outstandingCallables since we scheduled a close for all the contained scanners
242       outstandingCallables.clear();
243     }
244   }
245 
246   /**
247    * When a scanner switches in the middle of scanning (the 'next' call fails
248    * for example), the upper layer {@link ClientScanner} needs to know
249    * @return
250    */
251   public boolean switchedToADifferentReplica() {
252     return replicaSwitched.get();
253   }
254 
255   /**
256    * @return true when the most recent RPC response indicated that the response was a heartbeat
257    *         message. Heartbeat messages are sent back from the server when the processing of the
258    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
259    *         timeouts during long running scan operations.
260    */
261   public boolean isHeartbeatMessage() {
262     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
263   }
264 
265   private void addCallsForCurrentReplica(
266       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
267     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
268     outstandingCallables.add(currentScannerCallable);
269     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
270   }
271 
272   private void addCallsForOtherReplicas(
273       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
274       int min, int max) {
275     if (scan.getConsistency() == Consistency.STRONG) {
276       return; // not scheduling on other replicas for strong consistency
277     }
278     for (int id = min; id <= max; id++) {
279       if (currentScannerCallable.id == id) {
280         continue; //this was already scheduled earlier
281       }
282       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
283       setStartRowForReplicaCallable(s);
284       outstandingCallables.add(s);
285       RetryingRPC retryingOnReplica = new RetryingRPC(s);
286       cs.submit(retryingOnReplica, scannerTimeout, id);
287     }
288   }
289 
290   /**
291    * Set the start row for the replica callable based on the state of the last result received.
292    * @param callable The callable to set the start row on
293    */
294   private void setStartRowForReplicaCallable(ScannerCallable callable) {
295     if (this.lastResult == null || callable == null) return;
296 
297     if (this.lastResult.isPartial()) {
298       // The last result was a partial result which means we have not received all of the cells
299       // for this row. Thus, use the last result's row as the start row. If a replica switch
300       // occurs, the scanner will ensure that any accumulated partial results are cleared,
301       // and the scan can resume from this row.
302       callable.getScan().setStartRow(this.lastResult.getRow());
303     } else {
304       // The last result was not a partial result which means it contained all of the cells for
305       // that row (we no longer need any information from it). Set the start row to the next
306       // closest row that could be seen.
307       if (callable.getScan().isReversed()) {
308         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
309       } else {
310         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
311       }
312     }
313   }
314 
315   @VisibleForTesting
316   boolean isAnyRPCcancelled() {
317     return someRPCcancelled;
318   }
319 
320   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
321     final ScannerCallable callable;
322     RpcRetryingCaller<Result[]> caller;
323     private volatile boolean cancelled = false;
324 
325     RetryingRPC(ScannerCallable callable) {
326       this.callable = callable;
327       // For the Consistency.STRONG (default case), we reuse the caller
328       // to keep compatibility with what is done in the past
329       // For the Consistency.TIMELINE case, we can't reuse the caller
330       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
331       // and we can't invoke it multiple times at the same time)
332       this.caller = ScannerCallableWithReplicas.this.caller;
333       if (scan.getConsistency() == Consistency.TIMELINE) {
334         this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
335             .<Result[]>newCaller();
336       }
337     }
338 
339     @Override
340     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
341       // since the retries is done within the ResultBoundedCompletionService,
342       // we don't invoke callWithRetries here
343       if (cancelled) {
344         return null;
345       }
346       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
347       return new Pair<Result[], ScannerCallable>(res, this.callable);
348     }
349 
350     @Override
351     public void prepare(boolean reload) throws IOException {
352       if (cancelled) return;
353 
354       if (Thread.interrupted()) {
355         throw new InterruptedIOException();
356       }
357 
358       callable.prepare(reload);
359     }
360 
361     @Override
362     public void throwable(Throwable t, boolean retrying) {
363       callable.throwable(t, retrying);
364     }
365 
366     @Override
367     public String getExceptionMessageAdditionalDetail() {
368       return callable.getExceptionMessageAdditionalDetail();
369     }
370 
371     @Override
372     public long sleep(long pause, int tries) {
373       return callable.sleep(pause, tries);
374     }
375 
376     @Override
377     public void cancel() {
378       cancelled = true;
379       caller.cancel();
380       if (callable.getController() != null) {
381         callable.getController().startCancel();
382       }
383       someRPCcancelled = true;
384     }
385 
386     @Override
387     public boolean isCancelled() {
388       return cancelled;
389     }
390   }
391 
392   @Override
393   public void prepare(boolean reload) throws IOException {
394   }
395 
396   @Override
397   public void throwable(Throwable t, boolean retrying) {
398     currentScannerCallable.throwable(t, retrying);
399   }
400 
401   @Override
402   public String getExceptionMessageAdditionalDetail() {
403     return currentScannerCallable.getExceptionMessageAdditionalDetail();
404   }
405 
406   @Override
407   public long sleep(long pause, int tries) {
408     return currentScannerCallable.sleep(pause, tries);
409   }
410 }