View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.HashSet;
26  import java.util.Set;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.RegionLocations;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  
46  import com.google.common.annotations.VisibleForTesting;
47  
48  /**
49   * This class has the logic for handling scanners for regions with and without replicas.
50   * 1. A scan is attempted on the default (primary) region
51   * 2. The scanner sends all the RPCs to the default region until it is done, or, there
52   * is a timeout on the default (a timeout of zero is disallowed).
53   * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
54   * 4. The results from the first successful scanner are taken, and it is stored which server
55   * returned the results.
56   * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
57   * in which case, the other replicas are queried (as in (3) above).
58   *
59   */
60  @InterfaceAudience.Private
61  class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
62    private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
63    volatile ScannerCallable currentScannerCallable;
64    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
65    final ClusterConnection cConnection;
66    protected final ExecutorService pool;
67    protected final int timeBeforeReplicas;
68    private final Scan scan;
69    private final int retries;
70    private Result lastResult;
71    private final RpcRetryingCaller<Result[]> caller;
72    private final TableName tableName;
73    private Configuration conf;
74    private int scannerTimeout;
75    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
76    private boolean someRPCcancelled = false; //required for testing purposes only
77  
78    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
79        ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
80        int retries, int scannerTimeout, int caching, Configuration conf,
81        RpcRetryingCaller<Result []> caller) {
82      this.currentScannerCallable = baseCallable;
83      this.cConnection = cConnection;
84      this.pool = pool;
85      if (timeBeforeReplicas < 0) {
86        throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
87      }
88      this.timeBeforeReplicas = timeBeforeReplicas;
89      this.scan = scan;
90      this.retries = retries;
91      this.tableName = tableName;
92      this.conf = conf;
93      this.scannerTimeout = scannerTimeout;
94      this.caller = caller;
95    }
96  
97    public void setClose() {
98      currentScannerCallable.setClose();
99    }
100 
101   public void setRenew(boolean val) {
102     currentScannerCallable.setRenew(val);
103   }
104 
105   public void setCaching(int caching) {
106     currentScannerCallable.setCaching(caching);
107   }
108 
109   public int getCaching() {
110     return currentScannerCallable.getCaching();
111   }
112 
113   public HRegionInfo getHRegionInfo() {
114     return currentScannerCallable.getHRegionInfo();
115   }
116 
117   public boolean getServerHasMoreResults() {
118     return currentScannerCallable.getServerHasMoreResults();
119   }
120 
121   public void setServerHasMoreResults(boolean serverHasMoreResults) {
122     currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
123   }
124 
125   public boolean hasMoreResultsContext() {
126     return currentScannerCallable.hasMoreResultsContext();
127   }
128 
129   public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
130     currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
131   }
132 
133   @Override
134   public Result [] call(int timeout) throws IOException {
135     // If the active replica callable was closed somewhere, invoke the RPC to
136     // really close it. In the case of regular scanners, this applies. We make couple
137     // of RPCs to a RegionServer, and when that region is exhausted, we set
138     // the closed flag. Then an RPC is required to actually close the scanner.
139     if (currentScannerCallable != null && currentScannerCallable.closed) {
140       // For closing we target that exact scanner (and not do replica fallback like in
141       // the case of normal reads)
142       if (LOG.isTraceEnabled()) {
143         LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
144       }
145       Result[] r = currentScannerCallable.call(timeout);
146       currentScannerCallable = null;
147       return r;
148     }
149     // We need to do the following:
150     //1. When a scan goes out to a certain replica (default or not), we need to
151     //   continue to hit that until there is a failure. So store the last successfully invoked
152     //   replica
153     //2. We should close the "losing" scanners (scanners other than the ones we hear back
154     //   from first)
155     //
156     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
157         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
158         currentScannerCallable.getRow());
159 
160     // allocate a boundedcompletion pool of some multiple of number of replicas.
161     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
162     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
163         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
164             RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
165             rl.size() * 5);
166 
167     AtomicBoolean done = new AtomicBoolean(false);
168     replicaSwitched.set(false);
169     // submit call for the primary replica.
170     addCallsForCurrentReplica(cs, rl);
171 
172     try {
173       // wait for the timeout to see whether the primary responds back
174       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
175           TimeUnit.MICROSECONDS); // Yes, microseconds
176       if (f != null) {
177         Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
178         if (r != null && r.getSecond() != null) {
179           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
180         }
181         return r == null ? null : r.getFirst(); //great we got a response
182       }
183     } catch (ExecutionException e) {
184       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
185     } catch (CancellationException e) {
186       throw new InterruptedIOException(e.getMessage());
187     } catch (InterruptedException e) {
188       throw new InterruptedIOException(e.getMessage());
189     } catch (TimeoutException e) {
190       throw new InterruptedIOException(e.getMessage());
191     }
192 
193     // submit call for the all of the secondaries at once
194     // TODO: this may be an overkill for large region replication
195     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
196 
197     try {
198       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
199       if (f != null) {
200         Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
201         if (r != null && r.getSecond() != null) {
202           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
203         }
204         return r == null ? null : r.getFirst(); // great we got an answer
205       }
206     } catch (ExecutionException e) {
207       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
208     } catch (CancellationException e) {
209       throw new InterruptedIOException(e.getMessage());
210     } catch (InterruptedException e) {
211       throw new InterruptedIOException(e.getMessage());
212     } catch (TimeoutException e) {
213       throw new InterruptedIOException(e.getMessage());
214     } finally {
215       // We get there because we were interrupted or because one or more of the
216       // calls succeeded or failed. In all case, we stop all our tasks.
217       cs.cancelAll();
218     }
219     return null; // unreachable
220   }
221 
222   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
223       AtomicBoolean done, ExecutorService pool) {
224     if (done.compareAndSet(false, true)) {
225       if (currentScannerCallable != scanner) replicaSwitched.set(true);
226       currentScannerCallable = scanner;
227       // store where to start the replica scanner from if we need to.
228       if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
229       if (LOG.isTraceEnabled()) {
230         LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
231             " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
232       }
233       // close all outstanding replica scanners but the one we heard back from
234       outstandingCallables.remove(scanner);
235       for (ScannerCallable s : outstandingCallables) {
236         if (LOG.isTraceEnabled()) {
237           LOG.trace("Closing scanner id=" + s.scannerId +
238             ", replica=" + s.getHRegionInfo().getRegionId() +
239             " because slow and replica=" +
240             this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
241         }
242         // Submit the "close" to the pool since this might take time, and we don't
243         // want to wait for the "close" to happen yet. The "wait" will happen when
244         // the table is closed (when the awaitTermination of the underlying pool is called)
245         s.setClose();
246         final RetryingRPC r = new RetryingRPC(s);
247         pool.submit(new Callable<Void>(){
248           @Override
249           public Void call() throws Exception {
250             r.call(scannerTimeout);
251             return null;
252           }
253         });
254       }
255       // now clear outstandingCallables since we scheduled a close for all the contained scanners
256       outstandingCallables.clear();
257     }
258   }
259 
260   /**
261    * When a scanner switches in the middle of scanning (the 'next' call fails
262    * for example), the upper layer {@link ClientScanner} needs to know
263    * @return
264    */
265   public boolean switchedToADifferentReplica() {
266     return replicaSwitched.get();
267   }
268 
269   /**
270    * @return true when the most recent RPC response indicated that the response was a heartbeat
271    *         message. Heartbeat messages are sent back from the server when the processing of the
272    *         scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
273    *         timeouts during long running scan operations.
274    */
275   public boolean isHeartbeatMessage() {
276     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
277   }
278 
279   private void addCallsForCurrentReplica(
280       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
281     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
282     outstandingCallables.add(currentScannerCallable);
283     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
284   }
285 
286   private void addCallsForOtherReplicas(
287       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
288       int min, int max) {
289     if (scan.getConsistency() == Consistency.STRONG) {
290       return; // not scheduling on other replicas for strong consistency
291     }
292     for (int id = min; id <= max; id++) {
293       if (currentScannerCallable.id == id) {
294         continue; //this was already scheduled earlier
295       }
296       ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
297       setStartRowForReplicaCallable(s);
298       outstandingCallables.add(s);
299       RetryingRPC retryingOnReplica = new RetryingRPC(s);
300       cs.submit(retryingOnReplica, scannerTimeout, id);
301     }
302   }
303 
304   /**
305    * Set the start row for the replica callable based on the state of the last result received.
306    * @param callable The callable to set the start row on
307    */
308   private void setStartRowForReplicaCallable(ScannerCallable callable) {
309     if (this.lastResult == null || callable == null) return;
310 
311     if (this.lastResult.isPartial()) {
312       // The last result was a partial result which means we have not received all of the cells
313       // for this row. Thus, use the last result's row as the start row. If a replica switch
314       // occurs, the scanner will ensure that any accumulated partial results are cleared,
315       // and the scan can resume from this row.
316       callable.getScan().setStartRow(this.lastResult.getRow());
317     } else {
318       // The last result was not a partial result which means it contained all of the cells for
319       // that row (we no longer need any information from it). Set the start row to the next
320       // closest row that could be seen.
321       if (callable.getScan().isReversed()) {
322         callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
323       } else {
324         callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
325       }
326     }
327   }
328 
329   @VisibleForTesting
330   boolean isAnyRPCcancelled() {
331     return someRPCcancelled;
332   }
333 
334   class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
335     final ScannerCallable callable;
336     RpcRetryingCaller<Result[]> caller;
337     private volatile boolean cancelled = false;
338 
339     RetryingRPC(ScannerCallable callable) {
340       this.callable = callable;
341       // For the Consistency.STRONG (default case), we reuse the caller
342       // to keep compatibility with what is done in the past
343       // For the Consistency.TIMELINE case, we can't reuse the caller
344       // since we could be making parallel RPCs (caller.callWithRetries is synchronized
345       // and we can't invoke it multiple times at the same time)
346       this.caller = ScannerCallableWithReplicas.this.caller;
347       if (scan.getConsistency() == Consistency.TIMELINE) {
348         this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
349             .<Result[]>newCaller();
350       }
351     }
352 
353     @Override
354     public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
355       // since the retries is done within the ResultBoundedCompletionService,
356       // we don't invoke callWithRetries here
357       if (cancelled) {
358         return null;
359       }
360       Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
361       return new Pair<Result[], ScannerCallable>(res, this.callable);
362     }
363 
364     @Override
365     public void prepare(boolean reload) throws IOException {
366       if (cancelled) return;
367 
368       if (Thread.interrupted()) {
369         throw new InterruptedIOException();
370       }
371 
372       callable.prepare(reload);
373     }
374 
375     @Override
376     public void throwable(Throwable t, boolean retrying) {
377       callable.throwable(t, retrying);
378     }
379 
380     @Override
381     public String getExceptionMessageAdditionalDetail() {
382       return callable.getExceptionMessageAdditionalDetail();
383     }
384 
385     @Override
386     public long sleep(long pause, int tries) {
387       return callable.sleep(pause, tries);
388     }
389 
390     @Override
391     public void cancel() {
392       cancelled = true;
393       caller.cancel();
394       if (callable.getController() != null) {
395         callable.getController().startCancel();
396       }
397       someRPCcancelled = true;
398     }
399 
400     @Override
401     public boolean isCancelled() {
402       return cancelled;
403     }
404   }
405 
406   @Override
407   public void prepare(boolean reload) throws IOException {
408   }
409 
410   @Override
411   public void throwable(Throwable t, boolean retrying) {
412     currentScannerCallable.throwable(t, retrying);
413   }
414 
415   @Override
416   public String getExceptionMessageAdditionalDetail() {
417     return currentScannerCallable.getExceptionMessageAdditionalDetail();
418   }
419 
420   @Override
421   public long sleep(long pause, int tries) {
422     return currentScannerCallable.sleep(pause, tries);
423   }
424 }