001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.RejectedExecutionException; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicLong; 038import org.apache.hadoop.hbase.CallQueueTooBigException; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.RegionLocations; 043import org.apache.hadoop.hbase.RetryImmediatelyException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 047import org.apache.hadoop.hbase.client.coprocessor.Batch; 048import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 049import org.apache.hadoop.hbase.trace.TraceUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.htrace.core.Tracer; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * The context, and return value, for a single submit/submitAll call. 059 * Note on how this class (one AP submit) works. Initially, all requests are split into groups 060 * by server; request is sent to each server in parallel; the RPC calls are not async so a 061 * thread per server is used. Every time some actions fail, regions/locations might have 062 * changed, so we re-group them by server and region again and send these groups in parallel 063 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after 064 * scheduling children. This is why lots of code doesn't require any synchronization. 065 */ 066@InterfaceAudience.Private 067class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { 068 069 private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class); 070 071 private RetryingTimeTracker tracker; 072 073 /** 074 * Runnable (that can be submitted to thread pool) that waits for when it's time 075 * to issue replica calls, finds region replicas, groups the requests by replica and 076 * issues the calls (on separate threads, via sendMultiAction). 077 * This is done on a separate thread because we don't want to wait on user thread for 078 * our asynchronous call, and usually we have to wait before making replica calls. 079 */ 080 private final class ReplicaCallIssuingRunnable implements Runnable { 081 private final long startTime; 082 private final List<Action> initialActions; 083 084 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) { 085 this.initialActions = initialActions; 086 this.startTime = startTime; 087 } 088 089 @Override 090 public void run() { 091 boolean done = false; 092 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { 093 try { 094 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); 095 } catch (InterruptedException ex) { 096 LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage()); 097 return; 098 } 099 } 100 if (done) return; // Done within primary timeout 101 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 102 List<Action> unknownLocActions = new ArrayList<>(); 103 if (replicaGetIndices == null) { 104 for (int i = 0; i < results.length; ++i) { 105 addReplicaActions(i, actionsByServer, unknownLocActions); 106 } 107 } else { 108 for (int replicaGetIndice : replicaGetIndices) { 109 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); 110 } 111 } 112 if (!actionsByServer.isEmpty()) { 113 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); 114 } 115 if (!unknownLocActions.isEmpty()) { 116 actionsByServer = new HashMap<>(); 117 for (Action action : unknownLocActions) { 118 addReplicaActionsAgain(action, actionsByServer); 119 } 120 // Some actions may have completely failed, they are handled inside addAgain. 121 if (!actionsByServer.isEmpty()) { 122 sendMultiAction(actionsByServer, 1, null, true); 123 } 124 } 125 } 126 127 /** 128 * Add replica actions to action map by server. 129 * @param index Index of the original action. 130 * @param actionsByServer The map by server to add it to. 131 */ 132 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer, 133 List<Action> unknownReplicaActions) { 134 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. 135 Action action = initialActions.get(index); 136 RegionLocations loc = findAllLocationsOrFail(action, true); 137 if (loc == null) return; 138 HRegionLocation[] locs = loc.getRegionLocations(); 139 if (locs.length == 1) { 140 LOG.warn("No replicas found for {}", action.getAction()); 141 return; 142 } 143 synchronized (replicaResultLock) { 144 // Don't run replica calls if the original has finished. We could do it e.g. if 145 // original has already failed before first replica call (unlikely given retries), 146 // but that would require additional synchronization w.r.t. returning to caller. 147 if (results[index] != null) return; 148 // We set the number of calls here. After that any path must call setResult/setError. 149 // True even for replicas that are not found - if we refuse to send we MUST set error. 150 updateResult(index, new ReplicaResultState(locs.length)); 151 } 152 for (int i = 1; i < locs.length; ++i) { 153 Action replicaAction = new Action(action, i); 154 if (locs[i] != null) { 155 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), 156 replicaAction, actionsByServer, nonceGroup); 157 } else { 158 unknownReplicaActions.add(replicaAction); 159 } 160 } 161 } 162 163 private void addReplicaActionsAgain( 164 Action action, Map<ServerName, MultiAction> actionsByServer) { 165 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { 166 throw new AssertionError("Cannot have default replica here"); 167 } 168 HRegionLocation loc = getReplicaLocationOrFail(action); 169 if (loc == null) return; 170 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), 171 action, actionsByServer, nonceGroup); 172 } 173 } 174 175 /** 176 * Runnable (that can be submitted to thread pool) that submits MultiAction to a 177 * single server. The server call is synchronous, therefore we do it on a thread pool. 178 */ 179 final class SingleServerRequestRunnable implements Runnable { 180 private final MultiAction multiAction; 181 private final int numAttempt; 182 private final ServerName server; 183 private final Set<CancellableRegionServerCallable> callsInProgress; 184 SingleServerRequestRunnable( 185 MultiAction multiAction, int numAttempt, ServerName server, 186 Set<CancellableRegionServerCallable> callsInProgress) { 187 this.multiAction = multiAction; 188 this.numAttempt = numAttempt; 189 this.server = server; 190 this.callsInProgress = callsInProgress; 191 } 192 193 @Override 194 public void run() { 195 AbstractResponse res = null; 196 CancellableRegionServerCallable callable = currentCallable; 197 try { 198 // setup the callable based on the actions, if we don't have one already from the request 199 if (callable == null) { 200 callable = createCallable(server, tableName, multiAction); 201 } 202 RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout); 203 try { 204 if (callsInProgress != null) { 205 callsInProgress.add(callable); 206 } 207 res = caller.callWithoutRetries(callable, operationTimeout); 208 if (res == null) { 209 // Cancelled 210 return; 211 } 212 } catch (IOException e) { 213 // The service itself failed . It may be an error coming from the communication 214 // layer, but, as well, a functional error raised by the server. 215 receiveGlobalFailure(multiAction, server, numAttempt, e); 216 return; 217 } catch (Throwable t) { 218 // This should not happen. Let's log & retry anyway. 219 LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." + 220 " Retrying. Server=" + server + ", tableName=" + tableName, t); 221 receiveGlobalFailure(multiAction, server, numAttempt, t); 222 return; 223 } 224 if (res.type() == AbstractResponse.ResponseType.MULTI) { 225 // Normal case: we received an answer from the server, and it's not an exception. 226 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); 227 } else { 228 if (results != null) { 229 SingleResponse singleResponse = (SingleResponse) res; 230 updateResult(0, singleResponse.getEntry()); 231 } 232 decActionCounter(1); 233 } 234 } catch (Throwable t) { 235 // Something really bad happened. We are on the send thread that will now die. 236 LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t); 237 throw new RuntimeException(t); 238 } finally { 239 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 240 if (callsInProgress != null && callable != null && res != null) { 241 callsInProgress.remove(callable); 242 } 243 } 244 } 245 } 246 247 private final Batch.Callback<CResult> callback; 248 private final BatchErrors errors; 249 private final ConnectionImplementation.ServerErrorTracker errorsByServer; 250 private final ExecutorService pool; 251 private final Set<CancellableRegionServerCallable> callsInProgress; 252 253 254 private final TableName tableName; 255 private final AtomicLong actionsInProgress = new AtomicLong(-1); 256 /** 257 * The lock controls access to results. It is only held when populating results where 258 * there might be several callers (eventual consistency gets). For other requests, 259 * there's one unique call going on per result index. 260 */ 261 private final Object replicaResultLock = new Object(); 262 /** 263 * Result array. Null if results are not needed. Otherwise, each index corresponds to 264 * the action index in initial actions submitted. For most request types, has null-s for 265 * requests that are not done, and result/exception for those that are done. 266 * For eventual-consistency gets, initially the same applies; at some point, replica calls 267 * might be started, and ReplicaResultState is put at the corresponding indices. The 268 * returning calls check the type to detect when this is the case. After all calls are done, 269 * ReplicaResultState-s are replaced with results for the user. 270 */ 271 private final Object[] results; 272 /** 273 * Indices of replica gets in results. If null, all or no actions are replica-gets. 274 */ 275 private final int[] replicaGetIndices; 276 private final boolean hasAnyReplicaGets; 277 private final long nonceGroup; 278 private final CancellableRegionServerCallable currentCallable; 279 private final int operationTimeout; 280 private final int rpcTimeout; 281 private final AsyncProcess asyncProcess; 282 283 /** 284 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only 285 * used to make logging more clear, we don't actually care why we don't retry. 286 */ 287 public enum Retry { 288 YES, 289 NO_LOCATION_PROBLEM, 290 NO_NOT_RETRIABLE, 291 NO_RETRIES_EXHAUSTED, 292 NO_OTHER_SUCCEEDED 293 } 294 295 /** Sync point for calls to multiple replicas for the same user request (Get). 296 * Created and put in the results array (we assume replica calls require results) when 297 * the replica calls are launched. See results for details of this process. 298 * POJO, all fields are public. To modify them, the object itself is locked. */ 299 private static class ReplicaResultState { 300 public ReplicaResultState(int callCount) { 301 this.callCount = callCount; 302 } 303 304 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ 305 int callCount; 306 /** Errors for which it is not decided whether we will report them to user. If one of the 307 * calls succeeds, we will discard the errors that may have happened in the other calls. */ 308 BatchErrors replicaErrors = null; 309 310 @Override 311 public String toString() { 312 return "[call count " + callCount + "; errors " + replicaErrors + "]"; 313 } 314 } 315 316 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 317 long nonceGroup, AsyncProcess asyncProcess) { 318 this.pool = task.getPool(); 319 this.callback = task.getCallback(); 320 this.nonceGroup = nonceGroup; 321 this.tableName = task.getTableName(); 322 this.actionsInProgress.set(actions.size()); 323 if (task.getResults() == null) { 324 results = task.getNeedResults() ? new Object[actions.size()] : null; 325 } else { 326 if (task.getResults().length != actions.size()) { 327 throw new AssertionError("results.length"); 328 } 329 this.results = task.getResults(); 330 for (int i = 0; i != this.results.length; ++i) { 331 results[i] = null; 332 } 333 } 334 List<Integer> replicaGetIndices = null; 335 boolean hasAnyReplicaGets = false; 336 if (results != null) { 337 // Check to see if any requests might require replica calls. 338 // We expect that many requests will consist of all or no multi-replica gets; in such 339 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will 340 // store the list of action indexes for which replica gets are possible, and set 341 // hasAnyReplicaGets to true. 342 boolean hasAnyNonReplicaReqs = false; 343 int posInList = 0; 344 for (Action action : actions) { 345 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); 346 if (isReplicaGet) { 347 hasAnyReplicaGets = true; 348 if (hasAnyNonReplicaReqs) { // Mixed case 349 if (replicaGetIndices == null) { 350 replicaGetIndices = new ArrayList<>(actions.size() - 1); 351 } 352 replicaGetIndices.add(posInList); 353 } 354 } else if (!hasAnyNonReplicaReqs) { 355 // The first non-multi-replica request in the action list. 356 hasAnyNonReplicaReqs = true; 357 if (posInList > 0) { 358 // Add all the previous requests to the index lists. We know they are all 359 // replica-gets because this is the first non-multi-replica request in the list. 360 replicaGetIndices = new ArrayList<>(actions.size() - 1); 361 for (int i = 0; i < posInList; ++i) { 362 replicaGetIndices.add(i); 363 } 364 } 365 } 366 ++posInList; 367 } 368 } 369 this.hasAnyReplicaGets = hasAnyReplicaGets; 370 if (replicaGetIndices != null) { 371 this.replicaGetIndices = new int[replicaGetIndices.size()]; 372 int i = 0; 373 for (Integer el : replicaGetIndices) { 374 this.replicaGetIndices[i++] = el; 375 } 376 } else { 377 this.replicaGetIndices = null; 378 } 379 this.callsInProgress = !hasAnyReplicaGets ? null : 380 Collections.newSetFromMap( 381 new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>()); 382 this.asyncProcess = asyncProcess; 383 this.errorsByServer = createServerErrorTracker(); 384 this.errors = new BatchErrors(); 385 this.operationTimeout = task.getOperationTimeout(); 386 this.rpcTimeout = task.getRpcTimeout(); 387 this.currentCallable = task.getCallable(); 388 if (task.getCallable() == null) { 389 tracker = new RetryingTimeTracker().start(); 390 } 391 } 392 393 protected Set<CancellableRegionServerCallable> getCallsInProgress() { 394 return callsInProgress; 395 } 396 397 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, 398 Set<CancellableRegionServerCallable> callsInProgress) { 399 return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 400 } 401 402 /** 403 * Group a list of actions per region servers, and send them. 404 * 405 * @param currentActions - the list of row to submit 406 * @param numAttempt - the current numAttempt (first attempt is 1) 407 */ 408 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) { 409 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 410 411 boolean isReplica = false; 412 List<Action> unknownReplicaActions = null; 413 for (Action action : currentActions) { 414 RegionLocations locs = findAllLocationsOrFail(action, true); 415 if (locs == null) continue; 416 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 417 if (isReplica && !isReplicaAction) { 418 // This is the property of the current implementation, not a requirement. 419 throw new AssertionError("Replica and non-replica actions in the same retry"); 420 } 421 isReplica = isReplicaAction; 422 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); 423 if (loc == null || loc.getServerName() == null) { 424 if (isReplica) { 425 if (unknownReplicaActions == null) { 426 unknownReplicaActions = new ArrayList<>(1); 427 } 428 unknownReplicaActions.add(action); 429 } else { 430 // TODO: relies on primary location always being fetched 431 manageLocationError(action, null); 432 } 433 } else { 434 byte[] regionName = loc.getRegionInfo().getRegionName(); 435 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 436 } 437 } 438 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); 439 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); 440 441 if (!actionsByServer.isEmpty()) { 442 // If this is a first attempt to group and send, no replicas, we need replica thread. 443 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) 444 ? currentActions : null, numAttempt > 1 && !hasUnknown); 445 } 446 447 if (hasUnknown) { 448 actionsByServer = new HashMap<>(); 449 for (Action action : unknownReplicaActions) { 450 HRegionLocation loc = getReplicaLocationOrFail(action); 451 if (loc == null) continue; 452 byte[] regionName = loc.getRegionInfo().getRegionName(); 453 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 454 } 455 if (!actionsByServer.isEmpty()) { 456 sendMultiAction( 457 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); 458 } 459 } 460 } 461 462 private HRegionLocation getReplicaLocationOrFail(Action action) { 463 // We are going to try get location once again. For each action, we'll do it once 464 // from cache, because the previous calls in the loop might populate it. 465 int replicaId = action.getReplicaId(); 466 RegionLocations locs = findAllLocationsOrFail(action, true); 467 if (locs == null) return null; // manageError already called 468 HRegionLocation loc = locs.getRegionLocation(replicaId); 469 if (loc == null || loc.getServerName() == null) { 470 locs = findAllLocationsOrFail(action, false); 471 if (locs == null) return null; // manageError already called 472 loc = locs.getRegionLocation(replicaId); 473 } 474 if (loc == null || loc.getServerName() == null) { 475 manageLocationError(action, null); 476 return null; 477 } 478 return loc; 479 } 480 481 private void manageLocationError(Action action, Exception ex) { 482 String msg = "Cannot get replica " + action.getReplicaId() 483 + " location for " + action.getAction(); 484 LOG.error(msg); 485 if (ex == null) { 486 ex = new IOException(msg); 487 } 488 manageError(action.getOriginalIndex(), action.getAction(), 489 Retry.NO_LOCATION_PROBLEM, ex, null); 490 } 491 492 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { 493 if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id + 494 ", row cannot be null"); 495 RegionLocations loc = null; 496 try { 497 loc = asyncProcess.connection.locateRegion( 498 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); 499 } catch (IOException ex) { 500 manageLocationError(action, ex); 501 } 502 return loc; 503 } 504 505 /** 506 * Send a multi action structure to the servers, after a delay depending on the attempt 507 * number. Asynchronous. 508 * 509 * @param actionsByServer the actions structured by regions 510 * @param numAttempt the attempt number. 511 * @param actionsForReplicaThread original actions for replica thread; null on non-first call. 512 */ 513 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, 514 int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) { 515 // Run the last item on the same thread if we are already on a send thread. 516 // We hope most of the time it will be the only item, so we can cut down on threads. 517 int actionsRemaining = actionsByServer.size(); 518 // This iteration is by server (the HRegionLocation comparator is by server portion only). 519 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) { 520 ServerName server = e.getKey(); 521 MultiAction multiAction = e.getValue(); 522 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, 523 numAttempt); 524 // make sure we correctly count the number of runnables before we try to reuse the send 525 // thread, in case we had to split the request into different runnables because of backoff 526 if (runnables.size() > actionsRemaining) { 527 actionsRemaining = runnables.size(); 528 } 529 530 // run all the runnables 531 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow 532 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth 533 for (Runnable runnable : runnables) { 534 if ((--actionsRemaining == 0) && reuseThread 535 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) { 536 runnable.run(); 537 } else { 538 try { 539 pool.submit(runnable); 540 } catch (Throwable t) { 541 if (t instanceof RejectedExecutionException) { 542 // This should never happen. But as the pool is provided by the end user, 543 // let's secure this a little. 544 LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + 545 " Server=" + server.getServerName(), t); 546 } else { 547 // see #HBASE-14359 for more details 548 LOG.warn("Caught unexpected exception/error: ", t); 549 } 550 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 551 // We're likely to fail again, but this will increment the attempt counter, 552 // so it will finish. 553 receiveGlobalFailure(multiAction, server, numAttempt, t); 554 } 555 } 556 } 557 } 558 559 if (actionsForReplicaThread != null) { 560 startWaitingForReplicaCalls(actionsForReplicaThread); 561 } 562 } 563 564 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, 565 MultiAction multiAction, 566 int numAttempt) { 567 // no stats to manage, just do the standard action 568 if (asyncProcess.connection.getStatisticsTracker() == null) { 569 if (asyncProcess.connection.getConnectionMetrics() != null) { 570 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 571 } 572 asyncProcess.incTaskCounters(multiAction.getRegions(), server); 573 SingleServerRequestRunnable runnable = createSingleServerRequest( 574 multiAction, numAttempt, server, callsInProgress); 575 Tracer tracer = Tracer.curThreadTracer(); 576 577 if (tracer == null) { 578 return Collections.singletonList(runnable); 579 } else { 580 return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction")); 581 } 582 } 583 584 // group the actions by the amount of delay 585 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size()); 586 587 // split up the actions 588 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) { 589 Long backoff = getBackoff(server, e.getKey()); 590 DelayingRunner runner = actions.get(backoff); 591 if (runner == null) { 592 actions.put(backoff, new DelayingRunner(backoff, e)); 593 } else { 594 runner.add(e); 595 } 596 } 597 598 List<Runnable> toReturn = new ArrayList<>(actions.size()); 599 for (DelayingRunner runner : actions.values()) { 600 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); 601 String traceText = "AsyncProcess.sendMultiAction"; 602 Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); 603 // use a delay runner only if we need to sleep for some time 604 if (runner.getSleepTime() > 0) { 605 runner.setRunner(runnable); 606 traceText = "AsyncProcess.clientBackoff.sendMultiAction"; 607 runnable = runner; 608 if (asyncProcess.connection.getConnectionMetrics() != null) { 609 asyncProcess.connection.getConnectionMetrics() 610 .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime()); 611 } 612 } else { 613 if (asyncProcess.connection.getConnectionMetrics() != null) { 614 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 615 } 616 } 617 runnable = TraceUtil.wrap(runnable, traceText); 618 toReturn.add(runnable); 619 620 } 621 return toReturn; 622 } 623 624 /** 625 * @param server server location where the target region is hosted 626 * @param regionName name of the region which we are going to write some data 627 * @return the amount of time the client should wait until it submit a request to the 628 * specified server and region 629 */ 630 private Long getBackoff(ServerName server, byte[] regionName) { 631 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); 632 ServerStatistics stats = tracker.getStats(server); 633 return asyncProcess.connection.getBackoffPolicy() 634 .getBackoffTime(server, regionName, stats); 635 } 636 637 /** 638 * Starts waiting to issue replica calls on a different thread; or issues them immediately. 639 */ 640 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) { 641 long startTime = EnvironmentEdgeManager.currentTime(); 642 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( 643 actionsForReplicaThread, startTime); 644 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { 645 // Start replica calls immediately. 646 replicaRunnable.run(); 647 } else { 648 // Start the thread that may kick off replica gets. 649 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. 650 try { 651 pool.submit(replicaRunnable); 652 } catch (RejectedExecutionException ree) { 653 LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree); 654 } 655 } 656 } 657 658 /** 659 * Check that we can retry acts accordingly: logs, set the error status. 660 * 661 * @param originalIndex the position in the list sent 662 * @param row the row 663 * @param canRetry if false, we won't retry whatever the settings. 664 * @param throwable the throwable, if any (can be null) 665 * @param server the location, if any (can be null) 666 * @return true if the action can be retried, false otherwise. 667 */ 668 Retry manageError(int originalIndex, Row row, Retry canRetry, 669 Throwable throwable, ServerName server) { 670 if (canRetry == Retry.YES 671 && throwable != null && throwable instanceof DoNotRetryIOException) { 672 canRetry = Retry.NO_NOT_RETRIABLE; 673 } 674 675 if (canRetry != Retry.YES) { 676 // Batch.Callback<Res> was not called on failure in 0.94. We keep this. 677 setError(originalIndex, row, throwable, server); 678 } else if (isActionComplete(originalIndex, row)) { 679 canRetry = Retry.NO_OTHER_SUCCEEDED; 680 } 681 return canRetry; 682 } 683 684 /** 685 * Resubmit all the actions from this multiaction after a failure. 686 * 687 * @param rsActions the actions still to do from the initial list 688 * @param server the destination 689 * @param numAttempt the number of attempts so far 690 * @param t the throwable (if any) that caused the resubmit 691 */ 692 private void receiveGlobalFailure( 693 MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { 694 errorsByServer.reportServerError(server); 695 Retry canRetry = errorsByServer.canTryMore(numAttempt) 696 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 697 698 cleanServerCache(server, t); 699 int failed = 0; 700 int stopped = 0; 701 List<Action> toReplay = new ArrayList<>(); 702 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) { 703 byte[] regionName = e.getKey(); 704 byte[] row = e.getValue().get(0).getAction().getRow(); 705 // Do not use the exception for updating cache because it might be coming from 706 // any of the regions in the MultiAction. 707 updateCachedLocations(server, regionName, row, 708 ClientExceptionsUtil.isMetaClearingException(t) ? null : t); 709 for (Action action : e.getValue()) { 710 Retry retry = manageError( 711 action.getOriginalIndex(), action.getAction(), canRetry, t, server); 712 if (retry == Retry.YES) { 713 toReplay.add(action); 714 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { 715 ++stopped; 716 } else { 717 ++failed; 718 } 719 } 720 } 721 722 if (toReplay.isEmpty()) { 723 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); 724 } else { 725 resubmit(server, toReplay, numAttempt, rsActions.size(), t); 726 } 727 } 728 729 /** 730 * Log as much info as possible, and, if there is something to replay, 731 * submit it again after a back off sleep. 732 */ 733 private void resubmit(ServerName oldServer, List<Action> toReplay, 734 int numAttempt, int failureCount, Throwable throwable) { 735 // We have something to replay. We're going to sleep a little before. 736 737 // We have two contradicting needs here: 738 // 1) We want to get the new location after having slept, as it may change. 739 // 2) We want to take into account the location when calculating the sleep time. 740 // 3) If all this is just because the response needed to be chunked try again FAST. 741 // It should be possible to have some heuristics to take the right decision. Short term, 742 // we go for one. 743 boolean retryImmediately = throwable instanceof RetryImmediatelyException; 744 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; 745 long backOffTime; 746 if (retryImmediately) { 747 backOffTime = 0; 748 } else if (throwable instanceof CallQueueTooBigException) { 749 // Give a special check on CQTBE, see #HBASE-17114 750 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); 751 } else { 752 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); 753 } 754 if (numAttempt > asyncProcess.startLogErrorsCnt) { 755 // We use this value to have some logs when we have multiple failures, but not too many 756 // logs, as errors are to be expected when a region moves, splits and so on 757 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), 758 oldServer, throwable, backOffTime, true, null, -1, -1)); 759 } 760 761 try { 762 if (backOffTime > 0) { 763 Thread.sleep(backOffTime); 764 } 765 } catch (InterruptedException e) { 766 LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); 767 Thread.currentThread().interrupt(); 768 return; 769 } 770 771 groupAndSendMultiAction(toReplay, nextAttemptNumber); 772 } 773 774 private void logNoResubmit(ServerName oldServer, int numAttempt, 775 int failureCount, Throwable throwable, int failed, int stopped) { 776 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { 777 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); 778 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, 779 throwable, -1, false, timeStr, failed, stopped); 780 if (failed != 0) { 781 // Only log final failures as warning 782 LOG.warn(logMessage); 783 } else { 784 LOG.info(logMessage); 785 } 786 } 787 } 788 789 /** 790 * Called when we receive the result of a server query. 791 * 792 * @param multiAction - the multiAction we sent 793 * @param server - the location. It's used as a server name. 794 * @param responses - the response, if any 795 * @param numAttempt - the attempt 796 */ 797 private void receiveMultiAction(MultiAction multiAction, ServerName server, 798 MultiResponse responses, int numAttempt) { 799 assert responses != null; 800 updateStats(server, responses); 801 // Success or partial success 802 // Analyze detailed results. We can still have individual failures to be redo. 803 // two specific throwables are managed: 804 // - DoNotRetryIOException: we continue to retry for other actions 805 // - RegionMovedException: we update the cache with the new region location 806 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); 807 List<Action> toReplay = new ArrayList<>(); 808 Throwable lastException = null; 809 int failureCount = 0; 810 int failed = 0; 811 int stopped = 0; 812 Retry retry = null; 813 // Go by original action. 814 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) { 815 byte[] regionName = regionEntry.getKey(); 816 817 Throwable regionException = responses.getExceptions().get(regionName); 818 if (regionException != null) { 819 cleanServerCache(server, regionException); 820 } 821 822 Map<Integer, Object> regionResults = 823 results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); 824 boolean regionFailureRegistered = false; 825 for (Action sentAction : regionEntry.getValue()) { 826 Object result = regionResults.get(sentAction.getOriginalIndex()); 827 if (result == null) { 828 if (regionException == null) { 829 LOG.error("Server sent us neither results nor exceptions for " 830 + Bytes.toStringBinary(regionName) 831 + ", numAttempt:" + numAttempt); 832 regionException = new RuntimeException("Invalid response"); 833 } 834 // If the row operation encounters the region-lever error, the exception of action may be 835 // null. 836 result = regionException; 837 } 838 // Failure: retry if it's make sense else update the errors lists 839 if (result instanceof Throwable) { 840 Throwable actionException = (Throwable) result; 841 Row row = sentAction.getAction(); 842 lastException = regionException != null ? regionException 843 : ClientExceptionsUtil.findException(actionException); 844 // Register corresponding failures once per server/once per region. 845 if (!regionFailureRegistered) { 846 regionFailureRegistered = true; 847 updateCachedLocations(server, regionName, row.getRow(), actionException); 848 } 849 if (retry == null) { 850 errorsByServer.reportServerError(server); 851 // We determine canRetry only once for all calls, after reporting server failure. 852 retry = errorsByServer.canTryMore(numAttempt) ? 853 Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 854 } 855 ++failureCount; 856 switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, 857 server)) { 858 case YES: 859 toReplay.add(sentAction); 860 break; 861 case NO_OTHER_SUCCEEDED: 862 ++stopped; 863 break; 864 default: 865 ++failed; 866 break; 867 } 868 } else { 869 invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); 870 setResult(sentAction, result); 871 } 872 } 873 } 874 if (toReplay.isEmpty()) { 875 logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); 876 } else { 877 resubmit(server, toReplay, numAttempt, failureCount, lastException); 878 } 879 } 880 881 private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, 882 Throwable rowException) { 883 if (tableName == null) { 884 return; 885 } 886 try { 887 asyncProcess.connection 888 .updateCachedLocations(tableName, regionName, row, rowException, server); 889 } catch (Throwable ex) { 890 // That should never happen, but if it did, we want to make sure 891 // we still process errors 892 LOG.error("Couldn't update cached region locations: " + ex); 893 } 894 } 895 896 private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { 897 if (callback != null) { 898 try { 899 //noinspection unchecked 900 // TODO: would callback expect a replica region name if it gets one? 901 this.callback.update(regionName, row, result); 902 } catch (Throwable t) { 903 LOG.error("User callback threw an exception for " 904 + Bytes.toStringBinary(regionName) + ", ignoring", t); 905 } 906 } 907 } 908 909 private void cleanServerCache(ServerName server, Throwable regionException) { 910 if (ClientExceptionsUtil.isMetaClearingException(regionException)) { 911 // We want to make sure to clear the cache in case there were location-related exceptions. 912 // We don't to clear the cache for every possible exception that comes through, however. 913 asyncProcess.connection.clearCaches(server); 914 } 915 } 916 917 protected void updateStats(ServerName server, MultiResponse resp) { 918 ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()), 919 Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp); 920 } 921 922 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, 923 Throwable error, long backOffTime, boolean willRetry, String startTime, 924 int failed, int stopped) { 925 StringBuilder sb = new StringBuilder(); 926 sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName). 927 append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries). 928 append(", "); 929 930 if (failureCount > 0 || error != null){ 931 sb.append("failureCount=").append(failureCount).append("ops").append(", last exception="). 932 append(error); 933 } else { 934 sb.append("succeeded"); 935 } 936 937 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); 938 939 if (willRetry) { 940 sb.append(", retrying after=").append(backOffTime).append("ms"). 941 append(", operationsToReplay=").append(replaySize); 942 } else if (failureCount > 0) { 943 if (stopped > 0) { 944 sb.append("; NOT retrying, stopped=").append(stopped). 945 append(" because successful operation on other replica"); 946 } 947 if (failed > 0) { 948 sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!"); 949 } 950 } 951 952 return sb.toString(); 953 } 954 955 /** 956 * Sets the non-error result from a particular action. 957 * @param action Action (request) that the server responded to. 958 * @param result The result. 959 */ 960 private void setResult(Action action, Object result) { 961 if (result == null) { 962 throw new RuntimeException("Result cannot be null"); 963 } 964 ReplicaResultState state = null; 965 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 966 int index = action.getOriginalIndex(); 967 if (results == null) { 968 decActionCounter(index); 969 return; // Simple case, no replica requests. 970 } 971 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); 972 if (state == null) { 973 return; // Simple case, no replica requests. 974 } 975 // At this point we know that state is set to replica tracking class. 976 // It could be that someone else is also looking at it; however, we know there can 977 // only be one state object, and only one thread can set callCount to 0. Other threads 978 // will either see state with callCount 0 after locking it; or will not see state at all 979 // we will replace it with the result. 980 synchronized (state) { 981 if (state.callCount == 0) { 982 return; // someone already set the result 983 } 984 state.callCount = 0; 985 } 986 synchronized (replicaResultLock) { 987 if (results[index] != state) { 988 throw new AssertionError("We set the callCount but someone else replaced the result"); 989 } 990 updateResult(index, result); 991 } 992 993 decActionCounter(index); 994 } 995 996 /** 997 * Sets the error from a particular action. 998 * @param index Original action index. 999 * @param row Original request. 1000 * @param throwable The resulting error. 1001 * @param server The source server. 1002 */ 1003 private void setError(int index, Row row, Throwable throwable, ServerName server) { 1004 ReplicaResultState state = null; 1005 if (results == null) { 1006 // Note that we currently cannot have replica requests with null results. So it shouldn't 1007 // happen that multiple replica calls will call dAC for same actions with results == null. 1008 // Only one call per action should be present in this case. 1009 errors.add(throwable, row, server); 1010 decActionCounter(index); 1011 return; // Simple case, no replica requests. 1012 } 1013 state = trySetResultSimple(index, row, true, throwable, server, false); 1014 if (state == null) { 1015 return; // Simple case, no replica requests. 1016 } 1017 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. 1018 boolean isActionDone = false; 1019 synchronized (state) { 1020 switch (state.callCount) { 1021 case 0: return; // someone already set the result 1022 case 1: { // All calls failed, we are the last error. 1023 target = errors; 1024 isActionDone = true; 1025 break; 1026 } 1027 default: { 1028 assert state.callCount > 1; 1029 if (state.replicaErrors == null) { 1030 state.replicaErrors = new BatchErrors(); 1031 } 1032 target = state.replicaErrors; 1033 break; 1034 } 1035 } 1036 --state.callCount; 1037 } 1038 target.add(throwable, row, server); 1039 if (isActionDone) { 1040 if (state.replicaErrors != null) { // last call, no need to lock 1041 errors.merge(state.replicaErrors); 1042 } 1043 // See setResult for explanations. 1044 synchronized (replicaResultLock) { 1045 if (results[index] != state) { 1046 throw new AssertionError("We set the callCount but someone else replaced the result"); 1047 } 1048 updateResult(index, throwable); 1049 } 1050 decActionCounter(index); 1051 } 1052 } 1053 1054 /** 1055 * Checks if the action is complete; used on error to prevent needless retries. 1056 * Does not synchronize, assuming element index/field accesses are atomic. 1057 * This is an opportunistic optimization check, doesn't have to be strict. 1058 * @param index Original action index. 1059 * @param row Original request. 1060 */ 1061 private boolean isActionComplete(int index, Row row) { 1062 if (!AsyncProcess.isReplicaGet(row)) return false; 1063 Object resObj = results[index]; 1064 return (resObj != null) && (!(resObj instanceof ReplicaResultState) 1065 || ((ReplicaResultState)resObj).callCount == 0); 1066 } 1067 1068 /** 1069 * Tries to set the result or error for a particular action as if there were no replica calls. 1070 * @return null if successful; replica state if there were in fact replica calls. 1071 */ 1072 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, 1073 Object result, ServerName server, boolean isFromReplica) { 1074 Object resObj = null; 1075 if (!AsyncProcess.isReplicaGet(row)) { 1076 if (isFromReplica) { 1077 throw new AssertionError("Unexpected stale result for " + row); 1078 } 1079 updateResult(index, result); 1080 } else { 1081 synchronized (replicaResultLock) { 1082 resObj = results[index]; 1083 if (resObj == null) { 1084 if (isFromReplica) { 1085 throw new AssertionError("Unexpected stale result for " + row); 1086 } 1087 updateResult(index, result); 1088 } 1089 } 1090 } 1091 1092 ReplicaResultState rrs = 1093 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; 1094 if (rrs == null && isError) { 1095 // The resObj is not replica state (null or already set). 1096 errors.add((Throwable)result, row, server); 1097 } 1098 1099 if (resObj == null) { 1100 // resObj is null - no replica calls were made. 1101 decActionCounter(index); 1102 return null; 1103 } 1104 return rrs; 1105 } 1106 1107 private void decActionCounter(int index) { 1108 long actionsRemaining = actionsInProgress.decrementAndGet(); 1109 if (actionsRemaining < 0) { 1110 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); 1111 throw new AssertionError(error); 1112 } else if (actionsRemaining == 0) { 1113 synchronized (actionsInProgress) { 1114 actionsInProgress.notifyAll(); 1115 } 1116 } 1117 } 1118 1119 private String buildDetailedErrorMsg(String string, int index) { 1120 StringBuilder error = new StringBuilder(128); 1121 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") 1122 .append(actionsInProgress.get()).append("; replica gets: "); 1123 if (replicaGetIndices != null) { 1124 for (int i = 0; i < replicaGetIndices.length; ++i) { 1125 error.append(replicaGetIndices[i]).append(", "); 1126 } 1127 } else { 1128 error.append(hasAnyReplicaGets ? "all" : "none"); 1129 } 1130 error.append("; results "); 1131 if (results != null) { 1132 for (int i = 0; i < results.length; ++i) { 1133 Object o = results[i]; 1134 error.append(((o == null) ? "null" : o.toString())).append(", "); 1135 } 1136 } 1137 return error.toString(); 1138 } 1139 1140 @Override 1141 public void waitUntilDone() throws InterruptedIOException { 1142 try { 1143 waitUntilDone(Long.MAX_VALUE); 1144 } catch (InterruptedException iex) { 1145 throw new InterruptedIOException(iex.getMessage()); 1146 } finally { 1147 if (callsInProgress != null) { 1148 for (CancellableRegionServerCallable clb : callsInProgress) { 1149 clb.cancel(); 1150 } 1151 } 1152 } 1153 } 1154 1155 private boolean waitUntilDone(long cutoff) throws InterruptedException { 1156 boolean hasWait = cutoff != Long.MAX_VALUE; 1157 long lastLog = EnvironmentEdgeManager.currentTime(); 1158 long currentInProgress; 1159 while (0 != (currentInProgress = actionsInProgress.get())) { 1160 long now = EnvironmentEdgeManager.currentTime(); 1161 if (hasWait && (now * 1000L) > cutoff) { 1162 return false; 1163 } 1164 if (!hasWait) { // Only log if wait is infinite. 1165 if (now > lastLog + 10000) { 1166 lastLog = now; 1167 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress 1168 + " actions to finish on table: " + tableName); 1169 } 1170 } 1171 synchronized (actionsInProgress) { 1172 if (actionsInProgress.get() == 0) break; 1173 if (!hasWait) { 1174 actionsInProgress.wait(10); 1175 } else { 1176 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); 1177 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); 1178 } 1179 } 1180 } 1181 return true; 1182 } 1183 1184 @Override 1185 public boolean hasError() { 1186 return errors.hasErrors(); 1187 } 1188 1189 @Override 1190 public List<? extends Row> getFailedOperations() { 1191 return errors.actions; 1192 } 1193 1194 @Override 1195 public RetriesExhaustedWithDetailsException getErrors() { 1196 return errors.makeException(asyncProcess.logBatchErrorDetails); 1197 } 1198 1199 @Override 1200 public Object[] getResults() throws InterruptedIOException { 1201 waitUntilDone(); 1202 return results; 1203 } 1204 1205 /** 1206 * Creates the server error tracker to use inside process. 1207 * Currently, to preserve the main assumption about current retries, and to work well with 1208 * the retry-limit-based calculation, the calculation is local per Process object. 1209 * We may benefit from connection-wide tracking of server errors. 1210 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 1211 */ 1212 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 1213 return new ConnectionImplementation.ServerErrorTracker( 1214 asyncProcess.serverTrackerTimeout, asyncProcess.numTries); 1215 } 1216 1217 /** 1218 * Create a callable. Isolated to be easily overridden in the tests. 1219 */ 1220 private MultiServerCallable createCallable(final ServerName server, TableName tableName, 1221 final MultiAction multi) { 1222 return new MultiServerCallable(asyncProcess.connection, tableName, server, 1223 multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); 1224 } 1225 1226 private void updateResult(int index, Object result) { 1227 Object current = results[index]; 1228 if (current != null) { 1229 if (LOG.isDebugEnabled()) { 1230 LOG.debug("The result is assigned repeatedly! current:" + current 1231 + ", new:" + result); 1232 } 1233 } 1234 results[index] = result; 1235 } 1236 1237 long getNumberOfActionsInProgress() { 1238 return actionsInProgress.get(); 1239 } 1240}