001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.HashSet; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CancellationException; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HRegionInfo; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 044import org.apache.hadoop.hbase.util.Pair; 045 046/** 047 * This class has the logic for handling scanners for regions with and without replicas. 048 * 1. A scan is attempted on the default (primary) region 049 * 2. The scanner sends all the RPCs to the default region until it is done, or, there 050 * is a timeout on the default (a timeout of zero is disallowed). 051 * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) 052 * 4. The results from the first successful scanner are taken, and it is stored which server 053 * returned the results. 054 * 5. The next RPCs are done on the above stored server until it is done or there is a timeout, 055 * in which case, the other replicas are queried (as in (3) above). 056 * 057 */ 058@InterfaceAudience.Private 059class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { 060 private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); 061 volatile ScannerCallable currentScannerCallable; 062 AtomicBoolean replicaSwitched = new AtomicBoolean(false); 063 final ClusterConnection cConnection; 064 protected final ExecutorService pool; 065 protected final int timeBeforeReplicas; 066 private final Scan scan; 067 private final int retries; 068 private Result lastResult; 069 private final RpcRetryingCaller<Result[]> caller; 070 private final TableName tableName; 071 private Configuration conf; 072 private int scannerTimeout; 073 private Set<ScannerCallable> outstandingCallables = new HashSet<>(); 074 private boolean someRPCcancelled = false; //required for testing purposes only 075 private int regionReplication = 0; 076 077 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, 078 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, 079 int retries, int scannerTimeout, int caching, Configuration conf, 080 RpcRetryingCaller<Result []> caller) { 081 this.currentScannerCallable = baseCallable; 082 this.cConnection = cConnection; 083 this.pool = pool; 084 if (timeBeforeReplicas < 0) { 085 throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); 086 } 087 this.timeBeforeReplicas = timeBeforeReplicas; 088 this.scan = scan; 089 this.retries = retries; 090 this.tableName = tableName; 091 this.conf = conf; 092 this.scannerTimeout = scannerTimeout; 093 this.caller = caller; 094 } 095 096 public void setClose() { 097 currentScannerCallable.setClose(); 098 } 099 100 public void setRenew(boolean val) { 101 currentScannerCallable.setRenew(val); 102 } 103 104 public void setCaching(int caching) { 105 currentScannerCallable.setCaching(caching); 106 } 107 108 public int getCaching() { 109 return currentScannerCallable.getCaching(); 110 } 111 112 public HRegionInfo getHRegionInfo() { 113 return currentScannerCallable.getHRegionInfo(); 114 } 115 116 public MoreResults moreResultsInRegion() { 117 return currentScannerCallable.moreResultsInRegion(); 118 } 119 120 public MoreResults moreResultsForScan() { 121 return currentScannerCallable.moreResultsForScan(); 122 } 123 124 @Override 125 public Result [] call(int timeout) throws IOException { 126 // If the active replica callable was closed somewhere, invoke the RPC to 127 // really close it. In the case of regular scanners, this applies. We make couple 128 // of RPCs to a RegionServer, and when that region is exhausted, we set 129 // the closed flag. Then an RPC is required to actually close the scanner. 130 if (currentScannerCallable != null && currentScannerCallable.closed) { 131 // For closing we target that exact scanner (and not do replica fallback like in 132 // the case of normal reads) 133 if (LOG.isTraceEnabled()) { 134 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); 135 } 136 Result[] r = currentScannerCallable.call(timeout); 137 currentScannerCallable = null; 138 return r; 139 } 140 // We need to do the following: 141 //1. When a scan goes out to a certain replica (default or not), we need to 142 // continue to hit that until there is a failure. So store the last successfully invoked 143 // replica 144 //2. We should close the "losing" scanners (scanners other than the ones we hear back 145 // from first) 146 // 147 // Since RegionReplication is a table attribute, it wont change as long as table is enabled, 148 // it just needs to be set once. 149 150 if (regionReplication <= 0) { 151 RegionLocations rl = null; 152 try { 153 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 154 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, 155 currentScannerCallable.getRow()); 156 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 157 // We cannot get the primary replica region location, it is possible that the region server 158 // hosting meta table is down, it needs to proceed to try cached replicas directly. 159 if (cConnection instanceof ConnectionImplementation) { 160 rl = ((ConnectionImplementation) cConnection) 161 .getCachedLocation(tableName, currentScannerCallable.getRow()); 162 if (rl == null) { 163 throw e; 164 } 165 } else { 166 // For completeness 167 throw e; 168 } 169 } 170 regionReplication = rl.size(); 171 } 172 // allocate a boundedcompletion pool of some multiple of number of replicas. 173 // We want to accomodate some RPCs for redundant replica scans (but are still in progress) 174 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = 175 new ResultBoundedCompletionService<>( 176 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, 177 regionReplication * 5); 178 179 AtomicBoolean done = new AtomicBoolean(false); 180 replicaSwitched.set(false); 181 // submit call for the primary replica. 182 addCallsForCurrentReplica(cs); 183 int startIndex = 0; 184 185 try { 186 // wait for the timeout to see whether the primary responds back 187 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, 188 TimeUnit.MICROSECONDS); // Yes, microseconds 189 if (f != null) { 190 // After poll, if f is not null, there must be a completed task 191 Pair<Result[], ScannerCallable> r = f.get(); 192 if (r != null && r.getSecond() != null) { 193 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 194 } 195 return r == null ? null : r.getFirst(); //great we got a response 196 } 197 } catch (ExecutionException e) { 198 // We ignore the ExecutionException and continue with the replicas 199 if (LOG.isDebugEnabled()) { 200 LOG.debug("Scan with primary region returns " + e.getCause()); 201 } 202 203 // If rl's size is 1 or scan's consitency is strong, it needs to throw 204 // out the exception from the primary replica 205 if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) { 206 // Rethrow the first exception 207 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 208 } 209 210 startIndex = 1; 211 } catch (CancellationException e) { 212 throw new InterruptedIOException(e.getMessage()); 213 } catch (InterruptedException e) { 214 throw new InterruptedIOException(e.getMessage()); 215 } 216 217 // submit call for the all of the secondaries at once 218 int endIndex = regionReplication; 219 if (scan.getConsistency() == Consistency.STRONG) { 220 // When scan's consistency is strong, do not send to the secondaries 221 endIndex = 1; 222 } else { 223 // TODO: this may be an overkill for large region replication 224 addCallsForOtherReplicas(cs, 0, regionReplication - 1); 225 } 226 227 try { 228 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, 229 TimeUnit.MILLISECONDS, startIndex, endIndex); 230 231 if (f == null) { 232 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); 233 } 234 Pair<Result[], ScannerCallable> r = f.get(); 235 236 if (r != null && r.getSecond() != null) { 237 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 238 } 239 return r == null ? null : r.getFirst(); // great we got an answer 240 241 } catch (ExecutionException e) { 242 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 243 } catch (CancellationException e) { 244 throw new InterruptedIOException(e.getMessage()); 245 } catch (InterruptedException e) { 246 throw new InterruptedIOException(e.getMessage()); 247 } finally { 248 // We get there because we were interrupted or because one or more of the 249 // calls succeeded or failed. In all case, we stop all our tasks. 250 cs.cancelAll(); 251 } 252 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 253 throw new IOException("Imposible? Arrive at an unreachable line..."); 254 } 255 256 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, 257 AtomicBoolean done, ExecutorService pool) { 258 if (done.compareAndSet(false, true)) { 259 if (currentScannerCallable != scanner) replicaSwitched.set(true); 260 currentScannerCallable = scanner; 261 // store where to start the replica scanner from if we need to. 262 if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; 263 if (LOG.isTraceEnabled()) { 264 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId + 265 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); 266 } 267 // close all outstanding replica scanners but the one we heard back from 268 outstandingCallables.remove(scanner); 269 for (ScannerCallable s : outstandingCallables) { 270 if (LOG.isTraceEnabled()) { 271 LOG.trace("Closing scanner id=" + s.scannerId + 272 ", replica=" + s.getHRegionInfo().getRegionId() + 273 " because slow and replica=" + 274 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"); 275 } 276 // Submit the "close" to the pool since this might take time, and we don't 277 // want to wait for the "close" to happen yet. The "wait" will happen when 278 // the table is closed (when the awaitTermination of the underlying pool is called) 279 s.setClose(); 280 final RetryingRPC r = new RetryingRPC(s); 281 pool.submit(new Callable<Void>(){ 282 @Override 283 public Void call() throws Exception { 284 r.call(scannerTimeout); 285 return null; 286 } 287 }); 288 } 289 // now clear outstandingCallables since we scheduled a close for all the contained scanners 290 outstandingCallables.clear(); 291 } 292 } 293 294 /** 295 * When a scanner switches in the middle of scanning (the 'next' call fails 296 * for example), the upper layer {@link ClientScanner} needs to know 297 */ 298 public boolean switchedToADifferentReplica() { 299 return replicaSwitched.get(); 300 } 301 302 /** 303 * @return true when the most recent RPC response indicated that the response was a heartbeat 304 * message. Heartbeat messages are sent back from the server when the processing of the 305 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 306 * timeouts during long running scan operations. 307 */ 308 public boolean isHeartbeatMessage() { 309 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); 310 } 311 312 public Cursor getCursor() { 313 return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; 314 } 315 316 private void addCallsForCurrentReplica( 317 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { 318 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); 319 outstandingCallables.add(currentScannerCallable); 320 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); 321 } 322 323 private void addCallsForOtherReplicas( 324 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) { 325 326 for (int id = min; id <= max; id++) { 327 if (currentScannerCallable.id == id) { 328 continue; //this was already scheduled earlier 329 } 330 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); 331 setStartRowForReplicaCallable(s); 332 outstandingCallables.add(s); 333 RetryingRPC retryingOnReplica = new RetryingRPC(s); 334 cs.submit(retryingOnReplica, scannerTimeout, id); 335 } 336 } 337 338 /** 339 * Set the start row for the replica callable based on the state of the last result received. 340 * @param callable The callable to set the start row on 341 */ 342 private void setStartRowForReplicaCallable(ScannerCallable callable) { 343 if (this.lastResult == null || callable == null) { 344 return; 345 } 346 // 1. The last result was a partial result which means we have not received all of the cells 347 // for this row. Thus, use the last result's row as the start row. If a replica switch 348 // occurs, the scanner will ensure that any accumulated partial results are cleared, 349 // and the scan can resume from this row. 350 // 2. The last result was not a partial result which means it contained all of the cells for 351 // that row (we no longer need any information from it). Set the start row to the next 352 // closest row that could be seen. 353 callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow()); 354 } 355 356 @VisibleForTesting 357 boolean isAnyRPCcancelled() { 358 return someRPCcancelled; 359 } 360 361 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable { 362 final ScannerCallable callable; 363 RpcRetryingCaller<Result[]> caller; 364 private volatile boolean cancelled = false; 365 366 RetryingRPC(ScannerCallable callable) { 367 this.callable = callable; 368 // For the Consistency.STRONG (default case), we reuse the caller 369 // to keep compatibility with what is done in the past 370 // For the Consistency.TIMELINE case, we can't reuse the caller 371 // since we could be making parallel RPCs (caller.callWithRetries is synchronized 372 // and we can't invoke it multiple times at the same time) 373 this.caller = ScannerCallableWithReplicas.this.caller; 374 if (scan.getConsistency() == Consistency.TIMELINE) { 375 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) 376 .<Result[]>newCaller(); 377 } 378 } 379 380 @Override 381 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException { 382 // since the retries is done within the ResultBoundedCompletionService, 383 // we don't invoke callWithRetries here 384 if (cancelled) { 385 return null; 386 } 387 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); 388 return new Pair<>(res, this.callable); 389 } 390 391 @Override 392 public void prepare(boolean reload) throws IOException { 393 if (cancelled) return; 394 395 if (Thread.interrupted()) { 396 throw new InterruptedIOException(); 397 } 398 399 callable.prepare(reload); 400 } 401 402 @Override 403 public void throwable(Throwable t, boolean retrying) { 404 callable.throwable(t, retrying); 405 } 406 407 @Override 408 public String getExceptionMessageAdditionalDetail() { 409 return callable.getExceptionMessageAdditionalDetail(); 410 } 411 412 @Override 413 public long sleep(long pause, int tries) { 414 return callable.sleep(pause, tries); 415 } 416 417 @Override 418 public void cancel() { 419 cancelled = true; 420 caller.cancel(); 421 if (callable.getRpcController() != null) { 422 callable.getRpcController().startCancel(); 423 } 424 someRPCcancelled = true; 425 } 426 427 @Override 428 public boolean isCancelled() { 429 return cancelled; 430 } 431 } 432 433 @Override 434 public void prepare(boolean reload) throws IOException { 435 } 436 437 @Override 438 public void throwable(Throwable t, boolean retrying) { 439 currentScannerCallable.throwable(t, retrying); 440 } 441 442 @Override 443 public String getExceptionMessageAdditionalDetail() { 444 return currentScannerCallable.getExceptionMessageAdditionalDetail(); 445 } 446 447 @Override 448 public long sleep(long pause, int tries) { 449 return currentScannerCallable.sleep(pause, tries); 450 } 451}