001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.HashSet; 023import java.util.Set; 024import java.util.concurrent.Callable; 025import java.util.concurrent.CancellationException; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Future; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * This class has the logic for handling scanners for regions with and without replicas. 1. A scan 044 * is attempted on the default (primary) region, or a specific region. 2. The scanner sends all the 045 * RPCs to the default/specific region until it is done, or, there is a timeout on the 046 * default/specific region (a timeout of zero is disallowed). 3. If there is a timeout in (2) above, 047 * scanner(s) is opened on the non-default replica(s) only for Consistency.TIMELINE without specific 048 * replica id specified. 4. The results from the first successful scanner are taken, and it is 049 * stored which server returned the results. 5. The next RPCs are done on the above stored server 050 * until it is done or there is a timeout, in which case, the other replicas are queried (as in (3) 051 * above). 052 */ 053@InterfaceAudience.Private 054class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { 055 private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); 056 volatile ScannerCallable currentScannerCallable; 057 AtomicBoolean replicaSwitched = new AtomicBoolean(false); 058 final ClusterConnection cConnection; 059 protected final ExecutorService pool; 060 protected final int timeBeforeReplicas; 061 private final Scan scan; 062 private final int retries; 063 private Result lastResult; 064 private final RpcRetryingCaller<Result[]> caller; 065 private final TableName tableName; 066 private Configuration conf; 067 private final int scannerTimeout; 068 private final int readRpcTimeout; 069 private Set<ScannerCallable> outstandingCallables = new HashSet<>(); 070 private boolean someRPCcancelled = false; // required for testing purposes only 071 private int regionReplication = 0; 072 073 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, 074 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, 075 int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, 076 RpcRetryingCaller<Result[]> caller) { 077 this.currentScannerCallable = baseCallable; 078 this.cConnection = cConnection; 079 this.pool = pool; 080 if (timeBeforeReplicas < 0) { 081 throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); 082 } 083 this.timeBeforeReplicas = timeBeforeReplicas; 084 this.scan = scan; 085 this.retries = retries; 086 this.tableName = tableName; 087 this.conf = conf; 088 this.readRpcTimeout = readRpcTimeout; 089 this.scannerTimeout = scannerTimeout; 090 this.caller = caller; 091 } 092 093 public void setClose() { 094 if (currentScannerCallable != null) { 095 currentScannerCallable.setClose(); 096 } else { 097 LOG.warn("Calling close on ScannerCallable reference that is already null, " 098 + "which shouldn't happen."); 099 } 100 } 101 102 public void setRenew(boolean val) { 103 currentScannerCallable.setRenew(val); 104 } 105 106 public void setCaching(int caching) { 107 currentScannerCallable.setCaching(caching); 108 } 109 110 public int getCaching() { 111 return currentScannerCallable.getCaching(); 112 } 113 114 public HRegionInfo getHRegionInfo() { 115 return currentScannerCallable.getHRegionInfo(); 116 } 117 118 public MoreResults moreResultsInRegion() { 119 return currentScannerCallable.moreResultsInRegion(); 120 } 121 122 public MoreResults moreResultsForScan() { 123 return currentScannerCallable.moreResultsForScan(); 124 } 125 126 @Override 127 public Result[] call(int timeout) throws IOException { 128 // If the active replica callable was closed somewhere, invoke the RPC to 129 // really close it. In the case of regular scanners, this applies. We make couple 130 // of RPCs to a RegionServer, and when that region is exhausted, we set 131 // the closed flag. Then an RPC is required to actually close the scanner. 132 if (currentScannerCallable != null && currentScannerCallable.closed) { 133 // For closing we target that exact scanner (and not do replica fallback like in 134 // the case of normal reads) 135 if (LOG.isTraceEnabled()) { 136 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); 137 } 138 Result[] r = currentScannerCallable.call(timeout); 139 currentScannerCallable = null; 140 return r; 141 } else if (currentScannerCallable == null) { 142 LOG.warn("Another call received, but our ScannerCallable is already null. " 143 + "This shouldn't happen, but there's not much to do, so logging and returning null."); 144 return null; 145 } 146 // We need to do the following: 147 // 1. When a scan goes out to a certain replica (default or not), we need to 148 // continue to hit that until there is a failure. So store the last successfully invoked 149 // replica 150 // 2. We should close the "losing" scanners (scanners other than the ones we hear back 151 // from first) 152 // 153 // Since RegionReplication is a table attribute, it wont change as long as table is enabled, 154 // it just needs to be set once. 155 156 if (regionReplication <= 0) { 157 RegionLocations rl = null; 158 try { 159 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 160 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, 161 currentScannerCallable.getRow()); 162 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 163 // We cannot get the primary replica region location, it is possible that the region server 164 // hosting meta table is down, it needs to proceed to try cached replicas directly. 165 if (cConnection instanceof ConnectionImplementation) { 166 rl = ((ConnectionImplementation) cConnection).getCachedLocation(tableName, 167 currentScannerCallable.getRow()); 168 if (rl == null) { 169 throw e; 170 } 171 } else { 172 // For completeness 173 throw e; 174 } 175 } 176 regionReplication = rl.size(); 177 } 178 // allocate a boundedcompletion pool of some multiple of number of replicas. 179 // We want to accomodate some RPCs for redundant replica scans (but are still in progress) 180 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = 181 new ResultBoundedCompletionService<>( 182 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, 183 regionReplication * 5); 184 185 AtomicBoolean done = new AtomicBoolean(false); 186 replicaSwitched.set(false); 187 // submit call for the primary replica or user specified replica 188 addCallsForCurrentReplica(cs); 189 int startIndex = 0; 190 191 try { 192 // wait for the timeout to see whether the primary responds back 193 Future<Pair<Result[], ScannerCallable>> f = 194 cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds 195 if (f != null) { 196 // After poll, if f is not null, there must be a completed task 197 Pair<Result[], ScannerCallable> r = f.get(); 198 if (r != null && r.getSecond() != null) { 199 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 200 } 201 return r == null ? null : r.getFirst(); // great we got a response 202 } 203 } catch (ExecutionException e) { 204 // We ignore the ExecutionException and continue with the replicas 205 if (LOG.isDebugEnabled()) { 206 LOG.debug("Scan with primary region returns " + e.getCause()); 207 } 208 209 // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica, 210 // it needs to throw out the exception from the primary replica 211 if ( 212 regionReplication == 1 || scan.getConsistency() == Consistency.STRONG 213 || scan.getReplicaId() >= 0 214 ) { 215 // Rethrow the first exception 216 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 217 } 218 startIndex = 1; 219 } catch (CancellationException e) { 220 throw new InterruptedIOException(e.getMessage()); 221 } catch (InterruptedException e) { 222 throw new InterruptedIOException(e.getMessage()); 223 } 224 225 // submit call for the all of the secondaries at once 226 int endIndex = regionReplication; 227 if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) { 228 // When scan's consistency is strong or scan is over specific replica region, do not send to 229 // the secondaries 230 endIndex = 1; 231 } else { 232 // TODO: this may be an overkill for large region replication 233 addCallsForOtherReplicas(cs, 0, regionReplication - 1); 234 } 235 236 try { 237 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, 238 TimeUnit.MILLISECONDS, startIndex, endIndex); 239 240 if (f == null) { 241 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); 242 } 243 Pair<Result[], ScannerCallable> r = f.get(); 244 245 if (r != null && r.getSecond() != null) { 246 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 247 } 248 return r == null ? null : r.getFirst(); // great we got an answer 249 250 } catch (ExecutionException e) { 251 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 252 } catch (CancellationException e) { 253 throw new InterruptedIOException(e.getMessage()); 254 } catch (InterruptedException e) { 255 throw new InterruptedIOException(e.getMessage()); 256 } finally { 257 // We get there because we were interrupted or because one or more of the 258 // calls succeeded or failed. In all case, we stop all our tasks. 259 cs.cancelAll(); 260 } 261 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 262 throw new IOException("Imposible? Arrive at an unreachable line..."); 263 } 264 265 @SuppressWarnings("FutureReturnValueIgnored") 266 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, 267 AtomicBoolean done, ExecutorService pool) { 268 if (done.compareAndSet(false, true)) { 269 if (currentScannerCallable != scanner) replicaSwitched.set(true); 270 currentScannerCallable = scanner; 271 // store where to start the replica scanner from if we need to. 272 if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; 273 if (LOG.isTraceEnabled()) { 274 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId 275 + " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); 276 } 277 // close all outstanding replica scanners but the one we heard back from 278 outstandingCallables.remove(scanner); 279 for (ScannerCallable s : outstandingCallables) { 280 if (LOG.isTraceEnabled()) { 281 LOG.trace("Closing scanner id=" + s.scannerId + ", replica=" 282 + s.getHRegionInfo().getRegionId() + " because slow and replica=" 283 + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"); 284 } 285 // Submit the "close" to the pool since this might take time, and we don't 286 // want to wait for the "close" to happen yet. The "wait" will happen when 287 // the table is closed (when the awaitTermination of the underlying pool is called) 288 s.setClose(); 289 final RetryingRPC r = new RetryingRPC(s); 290 pool.submit(new Callable<Void>() { 291 @Override 292 public Void call() throws Exception { 293 r.call(scannerTimeout); 294 return null; 295 } 296 }); 297 } 298 // now clear outstandingCallables since we scheduled a close for all the contained scanners 299 outstandingCallables.clear(); 300 } 301 } 302 303 /** 304 * When a scanner switches in the middle of scanning (the 'next' call fails for example), the 305 * upper layer {@link ClientScanner} needs to know 306 */ 307 public boolean switchedToADifferentReplica() { 308 return replicaSwitched.get(); 309 } 310 311 /** 312 * Returns true when the most recent RPC response indicated that the response was a heartbeat 313 * message. Heartbeat messages are sent back from the server when the processing of the scan 314 * request exceeds a certain time threshold. Heartbeats allow the server to avoid timeouts during 315 * long running scan operations. 316 */ 317 public boolean isHeartbeatMessage() { 318 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); 319 } 320 321 public Cursor getCursor() { 322 return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; 323 } 324 325 private void 326 addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { 327 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); 328 outstandingCallables.add(currentScannerCallable); 329 cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); 330 } 331 332 private void addCallsForOtherReplicas( 333 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) { 334 335 for (int id = min; id <= max; id++) { 336 if (currentScannerCallable.id == id) { 337 continue; // this was already scheduled earlier 338 } 339 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); 340 setStartRowForReplicaCallable(s); 341 outstandingCallables.add(s); 342 RetryingRPC retryingOnReplica = new RetryingRPC(s); 343 cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); 344 } 345 } 346 347 /** 348 * Set the start row for the replica callable based on the state of the last result received. 349 * @param callable The callable to set the start row on 350 */ 351 private void setStartRowForReplicaCallable(ScannerCallable callable) { 352 if (this.lastResult == null || callable == null) { 353 return; 354 } 355 // 1. The last result was a partial result which means we have not received all of the cells 356 // for this row. Thus, use the last result's row as the start row. If a replica switch 357 // occurs, the scanner will ensure that any accumulated partial results are cleared, 358 // and the scan can resume from this row. 359 // 2. The last result was not a partial result which means it contained all of the cells for 360 // that row (we no longer need any information from it). Set the start row to the next 361 // closest row that could be seen. 362 callable.getScan().withStartRow(this.lastResult.getRow(), 363 this.lastResult.mayHaveMoreCellsInRow()); 364 } 365 366 boolean isAnyRPCcancelled() { 367 return someRPCcancelled; 368 } 369 370 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable { 371 final ScannerCallable callable; 372 RpcRetryingCaller<Result[]> caller; 373 private volatile boolean cancelled = false; 374 375 RetryingRPC(ScannerCallable callable) { 376 this.callable = callable; 377 // For the Consistency.STRONG (default case), we reuse the caller 378 // to keep compatibility with what is done in the past 379 // For the Consistency.TIMELINE case, we can't reuse the caller 380 // since we could be making parallel RPCs (caller.callWithRetries is synchronized 381 // and we can't invoke it multiple times at the same time) 382 this.caller = ScannerCallableWithReplicas.this.caller; 383 if (scan.getConsistency() == Consistency.TIMELINE) { 384 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).< 385 Result[]> newCaller(); 386 } 387 } 388 389 @Override 390 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException { 391 // since the retries is done within the ResultBoundedCompletionService, 392 // we don't invoke callWithRetries here 393 if (cancelled) { 394 return null; 395 } 396 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); 397 return new Pair<>(res, this.callable); 398 } 399 400 @Override 401 public void prepare(boolean reload) throws IOException { 402 if (cancelled) return; 403 404 if (Thread.interrupted()) { 405 throw new InterruptedIOException(); 406 } 407 408 callable.prepare(reload); 409 } 410 411 @Override 412 public void throwable(Throwable t, boolean retrying) { 413 callable.throwable(t, retrying); 414 } 415 416 @Override 417 public String getExceptionMessageAdditionalDetail() { 418 return callable.getExceptionMessageAdditionalDetail(); 419 } 420 421 @Override 422 public long sleep(long pause, int tries) { 423 return callable.sleep(pause, tries); 424 } 425 426 @Override 427 public void cancel() { 428 cancelled = true; 429 caller.cancel(); 430 if (callable.getRpcController() != null) { 431 callable.getRpcController().startCancel(); 432 } 433 someRPCcancelled = true; 434 } 435 436 @Override 437 public boolean isCancelled() { 438 return cancelled; 439 } 440 } 441 442 @Override 443 public void prepare(boolean reload) throws IOException { 444 } 445 446 @Override 447 public void throwable(Throwable t, boolean retrying) { 448 currentScannerCallable.throwable(t, retrying); 449 } 450 451 @Override 452 public String getExceptionMessageAdditionalDetail() { 453 return currentScannerCallable.getExceptionMessageAdditionalDetail(); 454 } 455 456 @Override 457 public long sleep(long pause, int tries) { 458 return currentScannerCallable.sleep(pause, tries); 459 } 460}