001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.HashSet; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CancellationException; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Future; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicBoolean; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HRegionInfo; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 044import org.apache.hadoop.hbase.util.Pair; 045 046/** 047 * This class has the logic for handling scanners for regions with and without replicas. 048 * 1. A scan is attempted on the default (primary) region 049 * 2. The scanner sends all the RPCs to the default region until it is done, or, there 050 * is a timeout on the default (a timeout of zero is disallowed). 051 * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) 052 * 4. The results from the first successful scanner are taken, and it is stored which server 053 * returned the results. 054 * 5. The next RPCs are done on the above stored server until it is done or there is a timeout, 055 * in which case, the other replicas are queried (as in (3) above). 056 * 057 */ 058@InterfaceAudience.Private 059class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { 060 private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); 061 volatile ScannerCallable currentScannerCallable; 062 AtomicBoolean replicaSwitched = new AtomicBoolean(false); 063 final ClusterConnection cConnection; 064 protected final ExecutorService pool; 065 protected final int timeBeforeReplicas; 066 private final Scan scan; 067 private final int retries; 068 private Result lastResult; 069 private final RpcRetryingCaller<Result[]> caller; 070 private final TableName tableName; 071 private Configuration conf; 072 private int scannerTimeout; 073 private Set<ScannerCallable> outstandingCallables = new HashSet<>(); 074 private boolean someRPCcancelled = false; //required for testing purposes only 075 private int regionReplication = 0; 076 077 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, 078 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, 079 int retries, int scannerTimeout, int caching, Configuration conf, 080 RpcRetryingCaller<Result []> caller) { 081 this.currentScannerCallable = baseCallable; 082 this.cConnection = cConnection; 083 this.pool = pool; 084 if (timeBeforeReplicas < 0) { 085 throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); 086 } 087 this.timeBeforeReplicas = timeBeforeReplicas; 088 this.scan = scan; 089 this.retries = retries; 090 this.tableName = tableName; 091 this.conf = conf; 092 this.scannerTimeout = scannerTimeout; 093 this.caller = caller; 094 } 095 096 public void setClose() { 097 if(currentScannerCallable != null) { 098 currentScannerCallable.setClose(); 099 } else { 100 LOG.warn("Calling close on ScannerCallable reference that is already null, " 101 + "which shouldn't happen."); 102 } 103 } 104 105 public void setRenew(boolean val) { 106 currentScannerCallable.setRenew(val); 107 } 108 109 public void setCaching(int caching) { 110 currentScannerCallable.setCaching(caching); 111 } 112 113 public int getCaching() { 114 return currentScannerCallable.getCaching(); 115 } 116 117 public HRegionInfo getHRegionInfo() { 118 return currentScannerCallable.getHRegionInfo(); 119 } 120 121 public MoreResults moreResultsInRegion() { 122 return currentScannerCallable.moreResultsInRegion(); 123 } 124 125 public MoreResults moreResultsForScan() { 126 return currentScannerCallable.moreResultsForScan(); 127 } 128 129 @Override 130 public Result [] call(int timeout) throws IOException { 131 // If the active replica callable was closed somewhere, invoke the RPC to 132 // really close it. In the case of regular scanners, this applies. We make couple 133 // of RPCs to a RegionServer, and when that region is exhausted, we set 134 // the closed flag. Then an RPC is required to actually close the scanner. 135 if (currentScannerCallable != null && currentScannerCallable.closed) { 136 // For closing we target that exact scanner (and not do replica fallback like in 137 // the case of normal reads) 138 if (LOG.isTraceEnabled()) { 139 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); 140 } 141 Result[] r = currentScannerCallable.call(timeout); 142 currentScannerCallable = null; 143 return r; 144 } else if(currentScannerCallable == null) { 145 LOG.warn("Another call received, but our ScannerCallable is already null. " 146 + "This shouldn't happen, but there's not much to do, so logging and returning null."); 147 return null; 148 } 149 // We need to do the following: 150 //1. When a scan goes out to a certain replica (default or not), we need to 151 // continue to hit that until there is a failure. So store the last successfully invoked 152 // replica 153 //2. We should close the "losing" scanners (scanners other than the ones we hear back 154 // from first) 155 // 156 // Since RegionReplication is a table attribute, it wont change as long as table is enabled, 157 // it just needs to be set once. 158 159 if (regionReplication <= 0) { 160 RegionLocations rl = null; 161 try { 162 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 163 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, 164 currentScannerCallable.getRow()); 165 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 166 // We cannot get the primary replica region location, it is possible that the region server 167 // hosting meta table is down, it needs to proceed to try cached replicas directly. 168 if (cConnection instanceof ConnectionImplementation) { 169 rl = ((ConnectionImplementation) cConnection) 170 .getCachedLocation(tableName, currentScannerCallable.getRow()); 171 if (rl == null) { 172 throw e; 173 } 174 } else { 175 // For completeness 176 throw e; 177 } 178 } 179 regionReplication = rl.size(); 180 } 181 // allocate a boundedcompletion pool of some multiple of number of replicas. 182 // We want to accomodate some RPCs for redundant replica scans (but are still in progress) 183 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = 184 new ResultBoundedCompletionService<>( 185 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, 186 regionReplication * 5); 187 188 AtomicBoolean done = new AtomicBoolean(false); 189 replicaSwitched.set(false); 190 // submit call for the primary replica. 191 addCallsForCurrentReplica(cs); 192 int startIndex = 0; 193 194 try { 195 // wait for the timeout to see whether the primary responds back 196 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, 197 TimeUnit.MICROSECONDS); // Yes, microseconds 198 if (f != null) { 199 // After poll, if f is not null, there must be a completed task 200 Pair<Result[], ScannerCallable> r = f.get(); 201 if (r != null && r.getSecond() != null) { 202 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 203 } 204 return r == null ? null : r.getFirst(); //great we got a response 205 } 206 } catch (ExecutionException e) { 207 // We ignore the ExecutionException and continue with the replicas 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("Scan with primary region returns " + e.getCause()); 210 } 211 212 // If rl's size is 1 or scan's consitency is strong, it needs to throw 213 // out the exception from the primary replica 214 if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) { 215 // Rethrow the first exception 216 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 217 } 218 219 startIndex = 1; 220 } catch (CancellationException e) { 221 throw new InterruptedIOException(e.getMessage()); 222 } catch (InterruptedException e) { 223 throw new InterruptedIOException(e.getMessage()); 224 } 225 226 // submit call for the all of the secondaries at once 227 int endIndex = regionReplication; 228 if (scan.getConsistency() == Consistency.STRONG) { 229 // When scan's consistency is strong, do not send to the secondaries 230 endIndex = 1; 231 } else { 232 // TODO: this may be an overkill for large region replication 233 addCallsForOtherReplicas(cs, 0, regionReplication - 1); 234 } 235 236 try { 237 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, 238 TimeUnit.MILLISECONDS, startIndex, endIndex); 239 240 if (f == null) { 241 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); 242 } 243 Pair<Result[], ScannerCallable> r = f.get(); 244 245 if (r != null && r.getSecond() != null) { 246 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 247 } 248 return r == null ? null : r.getFirst(); // great we got an answer 249 250 } catch (ExecutionException e) { 251 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 252 } catch (CancellationException e) { 253 throw new InterruptedIOException(e.getMessage()); 254 } catch (InterruptedException e) { 255 throw new InterruptedIOException(e.getMessage()); 256 } finally { 257 // We get there because we were interrupted or because one or more of the 258 // calls succeeded or failed. In all case, we stop all our tasks. 259 cs.cancelAll(); 260 } 261 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 262 throw new IOException("Imposible? Arrive at an unreachable line..."); 263 } 264 265 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, 266 AtomicBoolean done, ExecutorService pool) { 267 if (done.compareAndSet(false, true)) { 268 if (currentScannerCallable != scanner) replicaSwitched.set(true); 269 currentScannerCallable = scanner; 270 // store where to start the replica scanner from if we need to. 271 if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; 272 if (LOG.isTraceEnabled()) { 273 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId + 274 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); 275 } 276 // close all outstanding replica scanners but the one we heard back from 277 outstandingCallables.remove(scanner); 278 for (ScannerCallable s : outstandingCallables) { 279 if (LOG.isTraceEnabled()) { 280 LOG.trace("Closing scanner id=" + s.scannerId + 281 ", replica=" + s.getHRegionInfo().getRegionId() + 282 " because slow and replica=" + 283 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"); 284 } 285 // Submit the "close" to the pool since this might take time, and we don't 286 // want to wait for the "close" to happen yet. The "wait" will happen when 287 // the table is closed (when the awaitTermination of the underlying pool is called) 288 s.setClose(); 289 final RetryingRPC r = new RetryingRPC(s); 290 pool.submit(new Callable<Void>(){ 291 @Override 292 public Void call() throws Exception { 293 r.call(scannerTimeout); 294 return null; 295 } 296 }); 297 } 298 // now clear outstandingCallables since we scheduled a close for all the contained scanners 299 outstandingCallables.clear(); 300 } 301 } 302 303 /** 304 * When a scanner switches in the middle of scanning (the 'next' call fails 305 * for example), the upper layer {@link ClientScanner} needs to know 306 */ 307 public boolean switchedToADifferentReplica() { 308 return replicaSwitched.get(); 309 } 310 311 /** 312 * @return true when the most recent RPC response indicated that the response was a heartbeat 313 * message. Heartbeat messages are sent back from the server when the processing of the 314 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 315 * timeouts during long running scan operations. 316 */ 317 public boolean isHeartbeatMessage() { 318 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); 319 } 320 321 public Cursor getCursor() { 322 return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; 323 } 324 325 private void addCallsForCurrentReplica( 326 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { 327 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); 328 outstandingCallables.add(currentScannerCallable); 329 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); 330 } 331 332 private void addCallsForOtherReplicas( 333 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) { 334 335 for (int id = min; id <= max; id++) { 336 if (currentScannerCallable.id == id) { 337 continue; //this was already scheduled earlier 338 } 339 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); 340 setStartRowForReplicaCallable(s); 341 outstandingCallables.add(s); 342 RetryingRPC retryingOnReplica = new RetryingRPC(s); 343 cs.submit(retryingOnReplica, scannerTimeout, id); 344 } 345 } 346 347 /** 348 * Set the start row for the replica callable based on the state of the last result received. 349 * @param callable The callable to set the start row on 350 */ 351 private void setStartRowForReplicaCallable(ScannerCallable callable) { 352 if (this.lastResult == null || callable == null) { 353 return; 354 } 355 // 1. The last result was a partial result which means we have not received all of the cells 356 // for this row. Thus, use the last result's row as the start row. If a replica switch 357 // occurs, the scanner will ensure that any accumulated partial results are cleared, 358 // and the scan can resume from this row. 359 // 2. The last result was not a partial result which means it contained all of the cells for 360 // that row (we no longer need any information from it). Set the start row to the next 361 // closest row that could be seen. 362 callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow()); 363 } 364 365 @VisibleForTesting 366 boolean isAnyRPCcancelled() { 367 return someRPCcancelled; 368 } 369 370 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable { 371 final ScannerCallable callable; 372 RpcRetryingCaller<Result[]> caller; 373 private volatile boolean cancelled = false; 374 375 RetryingRPC(ScannerCallable callable) { 376 this.callable = callable; 377 // For the Consistency.STRONG (default case), we reuse the caller 378 // to keep compatibility with what is done in the past 379 // For the Consistency.TIMELINE case, we can't reuse the caller 380 // since we could be making parallel RPCs (caller.callWithRetries is synchronized 381 // and we can't invoke it multiple times at the same time) 382 this.caller = ScannerCallableWithReplicas.this.caller; 383 if (scan.getConsistency() == Consistency.TIMELINE) { 384 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) 385 .<Result[]>newCaller(); 386 } 387 } 388 389 @Override 390 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException { 391 // since the retries is done within the ResultBoundedCompletionService, 392 // we don't invoke callWithRetries here 393 if (cancelled) { 394 return null; 395 } 396 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); 397 return new Pair<>(res, this.callable); 398 } 399 400 @Override 401 public void prepare(boolean reload) throws IOException { 402 if (cancelled) return; 403 404 if (Thread.interrupted()) { 405 throw new InterruptedIOException(); 406 } 407 408 callable.prepare(reload); 409 } 410 411 @Override 412 public void throwable(Throwable t, boolean retrying) { 413 callable.throwable(t, retrying); 414 } 415 416 @Override 417 public String getExceptionMessageAdditionalDetail() { 418 return callable.getExceptionMessageAdditionalDetail(); 419 } 420 421 @Override 422 public long sleep(long pause, int tries) { 423 return callable.sleep(pause, tries); 424 } 425 426 @Override 427 public void cancel() { 428 cancelled = true; 429 caller.cancel(); 430 if (callable.getRpcController() != null) { 431 callable.getRpcController().startCancel(); 432 } 433 someRPCcancelled = true; 434 } 435 436 @Override 437 public boolean isCancelled() { 438 return cancelled; 439 } 440 } 441 442 @Override 443 public void prepare(boolean reload) throws IOException { 444 } 445 446 @Override 447 public void throwable(Throwable t, boolean retrying) { 448 currentScannerCallable.throwable(t, retrying); 449 } 450 451 @Override 452 public String getExceptionMessageAdditionalDetail() { 453 return currentScannerCallable.getExceptionMessageAdditionalDetail(); 454 } 455 456 @Override 457 public long sleep(long pause, int tries) { 458 return currentScannerCallable.sleep(pause, tries); 459 } 460}