001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.client; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.HashSet; 024import java.util.Set; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CancellationException; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Future; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HRegionInfo; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * This class has the logic for handling scanners for regions with and without replicas. 046 * 1. A scan is attempted on the default (primary) region, or a specific region. 047 * 2. The scanner sends all the RPCs to the default/specific region until it is done, or, there 048 * is a timeout on the default/specific region (a timeout of zero is disallowed). 049 * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) only 050 * for Consistency.TIMELINE without specific replica id specified. 051 * 4. The results from the first successful scanner are taken, and it is stored which server 052 * returned the results. 053 * 5. The next RPCs are done on the above stored server until it is done or there is a timeout, 054 * in which case, the other replicas are queried (as in (3) above). 055 * 056 */ 057@InterfaceAudience.Private 058class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { 059 private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); 060 volatile ScannerCallable currentScannerCallable; 061 AtomicBoolean replicaSwitched = new AtomicBoolean(false); 062 final ClusterConnection cConnection; 063 protected final ExecutorService pool; 064 protected final int timeBeforeReplicas; 065 private final Scan scan; 066 private final int retries; 067 private Result lastResult; 068 private final RpcRetryingCaller<Result[]> caller; 069 private final TableName tableName; 070 private Configuration conf; 071 private int scannerTimeout; 072 private Set<ScannerCallable> outstandingCallables = new HashSet<>(); 073 private boolean someRPCcancelled = false; //required for testing purposes only 074 private int regionReplication = 0; 075 076 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, 077 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, 078 int retries, int scannerTimeout, int caching, Configuration conf, 079 RpcRetryingCaller<Result []> caller) { 080 this.currentScannerCallable = baseCallable; 081 this.cConnection = cConnection; 082 this.pool = pool; 083 if (timeBeforeReplicas < 0) { 084 throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); 085 } 086 this.timeBeforeReplicas = timeBeforeReplicas; 087 this.scan = scan; 088 this.retries = retries; 089 this.tableName = tableName; 090 this.conf = conf; 091 this.scannerTimeout = scannerTimeout; 092 this.caller = caller; 093 } 094 095 public void setClose() { 096 if(currentScannerCallable != null) { 097 currentScannerCallable.setClose(); 098 } else { 099 LOG.warn("Calling close on ScannerCallable reference that is already null, " 100 + "which shouldn't happen."); 101 } 102 } 103 104 public void setRenew(boolean val) { 105 currentScannerCallable.setRenew(val); 106 } 107 108 public void setCaching(int caching) { 109 currentScannerCallable.setCaching(caching); 110 } 111 112 public int getCaching() { 113 return currentScannerCallable.getCaching(); 114 } 115 116 public HRegionInfo getHRegionInfo() { 117 return currentScannerCallable.getHRegionInfo(); 118 } 119 120 public MoreResults moreResultsInRegion() { 121 return currentScannerCallable.moreResultsInRegion(); 122 } 123 124 public MoreResults moreResultsForScan() { 125 return currentScannerCallable.moreResultsForScan(); 126 } 127 128 @Override 129 public Result[] call(int timeout) throws IOException { 130 // If the active replica callable was closed somewhere, invoke the RPC to 131 // really close it. In the case of regular scanners, this applies. We make couple 132 // of RPCs to a RegionServer, and when that region is exhausted, we set 133 // the closed flag. Then an RPC is required to actually close the scanner. 134 if (currentScannerCallable != null && currentScannerCallable.closed) { 135 // For closing we target that exact scanner (and not do replica fallback like in 136 // the case of normal reads) 137 if (LOG.isTraceEnabled()) { 138 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); 139 } 140 Result[] r = currentScannerCallable.call(timeout); 141 currentScannerCallable = null; 142 return r; 143 } else if(currentScannerCallable == null) { 144 LOG.warn("Another call received, but our ScannerCallable is already null. " 145 + "This shouldn't happen, but there's not much to do, so logging and returning null."); 146 return null; 147 } 148 // We need to do the following: 149 //1. When a scan goes out to a certain replica (default or not), we need to 150 // continue to hit that until there is a failure. So store the last successfully invoked 151 // replica 152 //2. We should close the "losing" scanners (scanners other than the ones we hear back 153 // from first) 154 // 155 // Since RegionReplication is a table attribute, it wont change as long as table is enabled, 156 // it just needs to be set once. 157 158 if (regionReplication <= 0) { 159 RegionLocations rl = null; 160 try { 161 rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 162 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, 163 currentScannerCallable.getRow()); 164 } catch (RetriesExhaustedException | DoNotRetryIOException e) { 165 // We cannot get the primary replica region location, it is possible that the region server 166 // hosting meta table is down, it needs to proceed to try cached replicas directly. 167 if (cConnection instanceof ConnectionImplementation) { 168 rl = ((ConnectionImplementation) cConnection) 169 .getCachedLocation(tableName, currentScannerCallable.getRow()); 170 if (rl == null) { 171 throw e; 172 } 173 } else { 174 // For completeness 175 throw e; 176 } 177 } 178 regionReplication = rl.size(); 179 } 180 // allocate a boundedcompletion pool of some multiple of number of replicas. 181 // We want to accomodate some RPCs for redundant replica scans (but are still in progress) 182 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = 183 new ResultBoundedCompletionService<>( 184 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, 185 regionReplication * 5); 186 187 AtomicBoolean done = new AtomicBoolean(false); 188 replicaSwitched.set(false); 189 // submit call for the primary replica or user specified replica 190 addCallsForCurrentReplica(cs); 191 int startIndex = 0; 192 193 try { 194 // wait for the timeout to see whether the primary responds back 195 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, 196 TimeUnit.MICROSECONDS); // Yes, microseconds 197 if (f != null) { 198 // After poll, if f is not null, there must be a completed task 199 Pair<Result[], ScannerCallable> r = f.get(); 200 if (r != null && r.getSecond() != null) { 201 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 202 } 203 return r == null ? null : r.getFirst(); //great we got a response 204 } 205 } catch (ExecutionException e) { 206 // We ignore the ExecutionException and continue with the replicas 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("Scan with primary region returns " + e.getCause()); 209 } 210 211 // If rl's size is 1 or scan's consitency is strong, or scan is over specific replica, 212 // it needs to throw out the exception from the primary replica 213 if (regionReplication == 1 || scan.getConsistency() == Consistency.STRONG || 214 scan.getReplicaId() >= 0) { 215 // Rethrow the first exception 216 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 217 } 218 startIndex = 1; 219 } catch (CancellationException e) { 220 throw new InterruptedIOException(e.getMessage()); 221 } catch (InterruptedException e) { 222 throw new InterruptedIOException(e.getMessage()); 223 } 224 225 // submit call for the all of the secondaries at once 226 int endIndex = regionReplication; 227 if (scan.getConsistency() == Consistency.STRONG || scan.getReplicaId() >= 0) { 228 // When scan's consistency is strong or scan is over specific replica region, do not send to 229 // the secondaries 230 endIndex = 1; 231 } else { 232 // TODO: this may be an overkill for large region replication 233 addCallsForOtherReplicas(cs, 0, regionReplication - 1); 234 } 235 236 try { 237 Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, 238 TimeUnit.MILLISECONDS, startIndex, endIndex); 239 240 if (f == null) { 241 throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); 242 } 243 Pair<Result[], ScannerCallable> r = f.get(); 244 245 if (r != null && r.getSecond() != null) { 246 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); 247 } 248 return r == null ? null : r.getFirst(); // great we got an answer 249 250 } catch (ExecutionException e) { 251 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); 252 } catch (CancellationException e) { 253 throw new InterruptedIOException(e.getMessage()); 254 } catch (InterruptedException e) { 255 throw new InterruptedIOException(e.getMessage()); 256 } finally { 257 // We get there because we were interrupted or because one or more of the 258 // calls succeeded or failed. In all case, we stop all our tasks. 259 cs.cancelAll(); 260 } 261 LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable 262 throw new IOException("Imposible? Arrive at an unreachable line..."); 263 } 264 265 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, 266 AtomicBoolean done, ExecutorService pool) { 267 if (done.compareAndSet(false, true)) { 268 if (currentScannerCallable != scanner) replicaSwitched.set(true); 269 currentScannerCallable = scanner; 270 // store where to start the replica scanner from if we need to. 271 if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; 272 if (LOG.isTraceEnabled()) { 273 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId + 274 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId()); 275 } 276 // close all outstanding replica scanners but the one we heard back from 277 outstandingCallables.remove(scanner); 278 for (ScannerCallable s : outstandingCallables) { 279 if (LOG.isTraceEnabled()) { 280 LOG.trace("Closing scanner id=" + s.scannerId + 281 ", replica=" + s.getHRegionInfo().getRegionId() + 282 " because slow and replica=" + 283 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"); 284 } 285 // Submit the "close" to the pool since this might take time, and we don't 286 // want to wait for the "close" to happen yet. The "wait" will happen when 287 // the table is closed (when the awaitTermination of the underlying pool is called) 288 s.setClose(); 289 final RetryingRPC r = new RetryingRPC(s); 290 pool.submit(new Callable<Void>(){ 291 @Override 292 public Void call() throws Exception { 293 r.call(scannerTimeout); 294 return null; 295 } 296 }); 297 } 298 // now clear outstandingCallables since we scheduled a close for all the contained scanners 299 outstandingCallables.clear(); 300 } 301 } 302 303 /** 304 * When a scanner switches in the middle of scanning (the 'next' call fails 305 * for example), the upper layer {@link ClientScanner} needs to know 306 */ 307 public boolean switchedToADifferentReplica() { 308 return replicaSwitched.get(); 309 } 310 311 /** 312 * @return true when the most recent RPC response indicated that the response was a heartbeat 313 * message. Heartbeat messages are sent back from the server when the processing of the 314 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid 315 * timeouts during long running scan operations. 316 */ 317 public boolean isHeartbeatMessage() { 318 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage(); 319 } 320 321 public Cursor getCursor() { 322 return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; 323 } 324 325 private void addCallsForCurrentReplica( 326 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { 327 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); 328 outstandingCallables.add(currentScannerCallable); 329 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); 330 } 331 332 private void addCallsForOtherReplicas( 333 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) { 334 335 for (int id = min; id <= max; id++) { 336 if (currentScannerCallable.id == id) { 337 continue; //this was already scheduled earlier 338 } 339 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); 340 setStartRowForReplicaCallable(s); 341 outstandingCallables.add(s); 342 RetryingRPC retryingOnReplica = new RetryingRPC(s); 343 cs.submit(retryingOnReplica, scannerTimeout, id); 344 } 345 } 346 347 /** 348 * Set the start row for the replica callable based on the state of the last result received. 349 * @param callable The callable to set the start row on 350 */ 351 private void setStartRowForReplicaCallable(ScannerCallable callable) { 352 if (this.lastResult == null || callable == null) { 353 return; 354 } 355 // 1. The last result was a partial result which means we have not received all of the cells 356 // for this row. Thus, use the last result's row as the start row. If a replica switch 357 // occurs, the scanner will ensure that any accumulated partial results are cleared, 358 // and the scan can resume from this row. 359 // 2. The last result was not a partial result which means it contained all of the cells for 360 // that row (we no longer need any information from it). Set the start row to the next 361 // closest row that could be seen. 362 callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow()); 363 } 364 365 boolean isAnyRPCcancelled() { 366 return someRPCcancelled; 367 } 368 369 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable { 370 final ScannerCallable callable; 371 RpcRetryingCaller<Result[]> caller; 372 private volatile boolean cancelled = false; 373 374 RetryingRPC(ScannerCallable callable) { 375 this.callable = callable; 376 // For the Consistency.STRONG (default case), we reuse the caller 377 // to keep compatibility with what is done in the past 378 // For the Consistency.TIMELINE case, we can't reuse the caller 379 // since we could be making parallel RPCs (caller.callWithRetries is synchronized 380 // and we can't invoke it multiple times at the same time) 381 this.caller = ScannerCallableWithReplicas.this.caller; 382 if (scan.getConsistency() == Consistency.TIMELINE) { 383 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf) 384 .<Result[]>newCaller(); 385 } 386 } 387 388 @Override 389 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException { 390 // since the retries is done within the ResultBoundedCompletionService, 391 // we don't invoke callWithRetries here 392 if (cancelled) { 393 return null; 394 } 395 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout); 396 return new Pair<>(res, this.callable); 397 } 398 399 @Override 400 public void prepare(boolean reload) throws IOException { 401 if (cancelled) return; 402 403 if (Thread.interrupted()) { 404 throw new InterruptedIOException(); 405 } 406 407 callable.prepare(reload); 408 } 409 410 @Override 411 public void throwable(Throwable t, boolean retrying) { 412 callable.throwable(t, retrying); 413 } 414 415 @Override 416 public String getExceptionMessageAdditionalDetail() { 417 return callable.getExceptionMessageAdditionalDetail(); 418 } 419 420 @Override 421 public long sleep(long pause, int tries) { 422 return callable.sleep(pause, tries); 423 } 424 425 @Override 426 public void cancel() { 427 cancelled = true; 428 caller.cancel(); 429 if (callable.getRpcController() != null) { 430 callable.getRpcController().startCancel(); 431 } 432 someRPCcancelled = true; 433 } 434 435 @Override 436 public boolean isCancelled() { 437 return cancelled; 438 } 439 } 440 441 @Override 442 public void prepare(boolean reload) throws IOException { 443 } 444 445 @Override 446 public void throwable(Throwable t, boolean retrying) { 447 currentScannerCallable.throwable(t, retrying); 448 } 449 450 @Override 451 public String getExceptionMessageAdditionalDetail() { 452 return currentScannerCallable.getExceptionMessageAdditionalDetail(); 453 } 454 455 @Override 456 public long sleep(long pause, int tries) { 457 return currentScannerCallable.sleep(pause, tries); 458 } 459}