001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.Date; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.RejectedExecutionException; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.hbase.CallQueueTooBigException; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.RetryImmediatelyException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 046import org.apache.hadoop.hbase.client.coprocessor.Batch; 047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 048import org.apache.hadoop.hbase.trace.TraceUtil; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.htrace.core.Tracer; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 057 058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 060 061/** 062 * The context, and return value, for a single submit/submitAll call. 063 * Note on how this class (one AP submit) works. Initially, all requests are split into groups 064 * by server; request is sent to each server in parallel; the RPC calls are not async so a 065 * thread per server is used. Every time some actions fail, regions/locations might have 066 * changed, so we re-group them by server and region again and send these groups in parallel 067 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after 068 * scheduling children. This is why lots of code doesn't require any synchronization. 069 */ 070@InterfaceAudience.Private 071class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { 072 073 private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class); 074 075 private RetryingTimeTracker tracker; 076 077 /** 078 * Runnable (that can be submitted to thread pool) that waits for when it's time 079 * to issue replica calls, finds region replicas, groups the requests by replica and 080 * issues the calls (on separate threads, via sendMultiAction). 081 * This is done on a separate thread because we don't want to wait on user thread for 082 * our asynchronous call, and usually we have to wait before making replica calls. 083 */ 084 private final class ReplicaCallIssuingRunnable implements Runnable { 085 private final long startTime; 086 private final List<Action> initialActions; 087 088 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) { 089 this.initialActions = initialActions; 090 this.startTime = startTime; 091 } 092 093 @Override 094 public void run() { 095 boolean done = false; 096 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { 097 try { 098 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); 099 } catch (InterruptedException ex) { 100 LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage()); 101 return; 102 } 103 } 104 if (done) return; // Done within primary timeout 105 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 106 List<Action> unknownLocActions = new ArrayList<>(); 107 if (replicaGetIndices == null) { 108 for (int i = 0; i < results.length; ++i) { 109 addReplicaActions(i, actionsByServer, unknownLocActions); 110 } 111 } else { 112 for (int replicaGetIndice : replicaGetIndices) { 113 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); 114 } 115 } 116 if (!actionsByServer.isEmpty()) { 117 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); 118 } 119 if (!unknownLocActions.isEmpty()) { 120 actionsByServer = new HashMap<>(); 121 for (Action action : unknownLocActions) { 122 addReplicaActionsAgain(action, actionsByServer); 123 } 124 // Some actions may have completely failed, they are handled inside addAgain. 125 if (!actionsByServer.isEmpty()) { 126 sendMultiAction(actionsByServer, 1, null, true); 127 } 128 } 129 } 130 131 /** 132 * Add replica actions to action map by server. 133 * @param index Index of the original action. 134 * @param actionsByServer The map by server to add it to. 135 */ 136 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer, 137 List<Action> unknownReplicaActions) { 138 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. 139 Action action = initialActions.get(index); 140 RegionLocations loc = findAllLocationsOrFail(action, true); 141 if (loc == null) return; 142 HRegionLocation[] locs = loc.getRegionLocations(); 143 if (locs.length == 1) { 144 LOG.warn("No replicas found for {}", action.getAction()); 145 return; 146 } 147 synchronized (replicaResultLock) { 148 // Don't run replica calls if the original has finished. We could do it e.g. if 149 // original has already failed before first replica call (unlikely given retries), 150 // but that would require additional synchronization w.r.t. returning to caller. 151 if (results[index] != null) return; 152 // We set the number of calls here. After that any path must call setResult/setError. 153 // True even for replicas that are not found - if we refuse to send we MUST set error. 154 updateResult(index, new ReplicaResultState(locs.length)); 155 } 156 for (int i = 1; i < locs.length; ++i) { 157 Action replicaAction = new Action(action, i); 158 if (locs[i] != null) { 159 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), 160 replicaAction, actionsByServer, nonceGroup); 161 } else { 162 unknownReplicaActions.add(replicaAction); 163 } 164 } 165 } 166 167 private void addReplicaActionsAgain( 168 Action action, Map<ServerName, MultiAction> actionsByServer) { 169 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { 170 throw new AssertionError("Cannot have default replica here"); 171 } 172 HRegionLocation loc = getReplicaLocationOrFail(action); 173 if (loc == null) return; 174 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), 175 action, actionsByServer, nonceGroup); 176 } 177 } 178 179 /** 180 * Runnable (that can be submitted to thread pool) that submits MultiAction to a 181 * single server. The server call is synchronous, therefore we do it on a thread pool. 182 */ 183 @VisibleForTesting 184 final class SingleServerRequestRunnable implements Runnable { 185 private final MultiAction multiAction; 186 private final int numAttempt; 187 private final ServerName server; 188 private final Set<CancellableRegionServerCallable> callsInProgress; 189 @VisibleForTesting 190 SingleServerRequestRunnable( 191 MultiAction multiAction, int numAttempt, ServerName server, 192 Set<CancellableRegionServerCallable> callsInProgress) { 193 this.multiAction = multiAction; 194 this.numAttempt = numAttempt; 195 this.server = server; 196 this.callsInProgress = callsInProgress; 197 } 198 199 @Override 200 public void run() { 201 AbstractResponse res = null; 202 CancellableRegionServerCallable callable = currentCallable; 203 try { 204 // setup the callable based on the actions, if we don't have one already from the request 205 if (callable == null) { 206 callable = createCallable(server, tableName, multiAction); 207 } 208 RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout); 209 try { 210 if (callsInProgress != null) { 211 callsInProgress.add(callable); 212 } 213 res = caller.callWithoutRetries(callable, operationTimeout); 214 if (res == null) { 215 // Cancelled 216 return; 217 } 218 } catch (IOException e) { 219 // The service itself failed . It may be an error coming from the communication 220 // layer, but, as well, a functional error raised by the server. 221 receiveGlobalFailure(multiAction, server, numAttempt, e); 222 return; 223 } catch (Throwable t) { 224 // This should not happen. Let's log & retry anyway. 225 LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." + 226 " Retrying. Server=" + server + ", tableName=" + tableName, t); 227 receiveGlobalFailure(multiAction, server, numAttempt, t); 228 return; 229 } 230 if (res.type() == AbstractResponse.ResponseType.MULTI) { 231 // Normal case: we received an answer from the server, and it's not an exception. 232 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); 233 } else { 234 if (results != null) { 235 SingleResponse singleResponse = (SingleResponse) res; 236 updateResult(0, singleResponse.getEntry()); 237 } 238 decActionCounter(1); 239 } 240 } catch (Throwable t) { 241 // Something really bad happened. We are on the send thread that will now die. 242 LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t); 243 throw new RuntimeException(t); 244 } finally { 245 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 246 if (callsInProgress != null && callable != null && res != null) { 247 callsInProgress.remove(callable); 248 } 249 } 250 } 251 } 252 253 private final Batch.Callback<CResult> callback; 254 private final BatchErrors errors; 255 private final ConnectionImplementation.ServerErrorTracker errorsByServer; 256 private final ExecutorService pool; 257 private final Set<CancellableRegionServerCallable> callsInProgress; 258 259 260 private final TableName tableName; 261 private final AtomicLong actionsInProgress = new AtomicLong(-1); 262 /** 263 * The lock controls access to results. It is only held when populating results where 264 * there might be several callers (eventual consistency gets). For other requests, 265 * there's one unique call going on per result index. 266 */ 267 private final Object replicaResultLock = new Object(); 268 /** 269 * Result array. Null if results are not needed. Otherwise, each index corresponds to 270 * the action index in initial actions submitted. For most request types, has null-s for 271 * requests that are not done, and result/exception for those that are done. 272 * For eventual-consistency gets, initially the same applies; at some point, replica calls 273 * might be started, and ReplicaResultState is put at the corresponding indices. The 274 * returning calls check the type to detect when this is the case. After all calls are done, 275 * ReplicaResultState-s are replaced with results for the user. 276 */ 277 private final Object[] results; 278 /** 279 * Indices of replica gets in results. If null, all or no actions are replica-gets. 280 */ 281 private final int[] replicaGetIndices; 282 private final boolean hasAnyReplicaGets; 283 private final long nonceGroup; 284 private final CancellableRegionServerCallable currentCallable; 285 private final int operationTimeout; 286 private final int rpcTimeout; 287 private final AsyncProcess asyncProcess; 288 289 /** 290 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only 291 * used to make logging more clear, we don't actually care why we don't retry. 292 */ 293 public enum Retry { 294 YES, 295 NO_LOCATION_PROBLEM, 296 NO_NOT_RETRIABLE, 297 NO_RETRIES_EXHAUSTED, 298 NO_OTHER_SUCCEEDED 299 } 300 301 /** Sync point for calls to multiple replicas for the same user request (Get). 302 * Created and put in the results array (we assume replica calls require results) when 303 * the replica calls are launched. See results for details of this process. 304 * POJO, all fields are public. To modify them, the object itself is locked. */ 305 private static class ReplicaResultState { 306 public ReplicaResultState(int callCount) { 307 this.callCount = callCount; 308 } 309 310 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ 311 int callCount; 312 /** Errors for which it is not decided whether we will report them to user. If one of the 313 * calls succeeds, we will discard the errors that may have happened in the other calls. */ 314 BatchErrors replicaErrors = null; 315 316 @Override 317 public String toString() { 318 return "[call count " + callCount + "; errors " + replicaErrors + "]"; 319 } 320 } 321 322 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, 323 long nonceGroup, AsyncProcess asyncProcess) { 324 this.pool = task.getPool(); 325 this.callback = task.getCallback(); 326 this.nonceGroup = nonceGroup; 327 this.tableName = task.getTableName(); 328 this.actionsInProgress.set(actions.size()); 329 if (task.getResults() == null) { 330 results = task.getNeedResults() ? new Object[actions.size()] : null; 331 } else { 332 if (task.getResults().length != actions.size()) { 333 throw new AssertionError("results.length"); 334 } 335 this.results = task.getResults(); 336 for (int i = 0; i != this.results.length; ++i) { 337 results[i] = null; 338 } 339 } 340 List<Integer> replicaGetIndices = null; 341 boolean hasAnyReplicaGets = false; 342 if (results != null) { 343 // Check to see if any requests might require replica calls. 344 // We expect that many requests will consist of all or no multi-replica gets; in such 345 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will 346 // store the list of action indexes for which replica gets are possible, and set 347 // hasAnyReplicaGets to true. 348 boolean hasAnyNonReplicaReqs = false; 349 int posInList = 0; 350 for (Action action : actions) { 351 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); 352 if (isReplicaGet) { 353 hasAnyReplicaGets = true; 354 if (hasAnyNonReplicaReqs) { // Mixed case 355 if (replicaGetIndices == null) { 356 replicaGetIndices = new ArrayList<>(actions.size() - 1); 357 } 358 replicaGetIndices.add(posInList); 359 } 360 } else if (!hasAnyNonReplicaReqs) { 361 // The first non-multi-replica request in the action list. 362 hasAnyNonReplicaReqs = true; 363 if (posInList > 0) { 364 // Add all the previous requests to the index lists. We know they are all 365 // replica-gets because this is the first non-multi-replica request in the list. 366 replicaGetIndices = new ArrayList<>(actions.size() - 1); 367 for (int i = 0; i < posInList; ++i) { 368 replicaGetIndices.add(i); 369 } 370 } 371 } 372 ++posInList; 373 } 374 } 375 this.hasAnyReplicaGets = hasAnyReplicaGets; 376 if (replicaGetIndices != null) { 377 this.replicaGetIndices = new int[replicaGetIndices.size()]; 378 int i = 0; 379 for (Integer el : replicaGetIndices) { 380 this.replicaGetIndices[i++] = el; 381 } 382 } else { 383 this.replicaGetIndices = null; 384 } 385 this.callsInProgress = !hasAnyReplicaGets ? null : 386 Collections.newSetFromMap( 387 new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>()); 388 this.asyncProcess = asyncProcess; 389 this.errorsByServer = createServerErrorTracker(); 390 this.errors = new BatchErrors(); 391 this.operationTimeout = task.getOperationTimeout(); 392 this.rpcTimeout = task.getRpcTimeout(); 393 this.currentCallable = task.getCallable(); 394 if (task.getCallable() == null) { 395 tracker = new RetryingTimeTracker().start(); 396 } 397 } 398 399 @VisibleForTesting 400 protected Set<CancellableRegionServerCallable> getCallsInProgress() { 401 return callsInProgress; 402 } 403 404 @VisibleForTesting 405 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, 406 Set<CancellableRegionServerCallable> callsInProgress) { 407 return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 408 } 409 410 /** 411 * Group a list of actions per region servers, and send them. 412 * 413 * @param currentActions - the list of row to submit 414 * @param numAttempt - the current numAttempt (first attempt is 1) 415 */ 416 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) { 417 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 418 419 boolean isReplica = false; 420 List<Action> unknownReplicaActions = null; 421 for (Action action : currentActions) { 422 RegionLocations locs = findAllLocationsOrFail(action, true); 423 if (locs == null) continue; 424 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 425 if (isReplica && !isReplicaAction) { 426 // This is the property of the current implementation, not a requirement. 427 throw new AssertionError("Replica and non-replica actions in the same retry"); 428 } 429 isReplica = isReplicaAction; 430 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); 431 if (loc == null || loc.getServerName() == null) { 432 if (isReplica) { 433 if (unknownReplicaActions == null) { 434 unknownReplicaActions = new ArrayList<>(1); 435 } 436 unknownReplicaActions.add(action); 437 } else { 438 // TODO: relies on primary location always being fetched 439 manageLocationError(action, null); 440 } 441 } else { 442 byte[] regionName = loc.getRegionInfo().getRegionName(); 443 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 444 } 445 } 446 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); 447 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); 448 449 if (!actionsByServer.isEmpty()) { 450 // If this is a first attempt to group and send, no replicas, we need replica thread. 451 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown) 452 ? currentActions : null, numAttempt > 1 && !hasUnknown); 453 } 454 455 if (hasUnknown) { 456 actionsByServer = new HashMap<>(); 457 for (Action action : unknownReplicaActions) { 458 HRegionLocation loc = getReplicaLocationOrFail(action); 459 if (loc == null) continue; 460 byte[] regionName = loc.getRegionInfo().getRegionName(); 461 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 462 } 463 if (!actionsByServer.isEmpty()) { 464 sendMultiAction( 465 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); 466 } 467 } 468 } 469 470 private HRegionLocation getReplicaLocationOrFail(Action action) { 471 // We are going to try get location once again. For each action, we'll do it once 472 // from cache, because the previous calls in the loop might populate it. 473 int replicaId = action.getReplicaId(); 474 RegionLocations locs = findAllLocationsOrFail(action, true); 475 if (locs == null) return null; // manageError already called 476 HRegionLocation loc = locs.getRegionLocation(replicaId); 477 if (loc == null || loc.getServerName() == null) { 478 locs = findAllLocationsOrFail(action, false); 479 if (locs == null) return null; // manageError already called 480 loc = locs.getRegionLocation(replicaId); 481 } 482 if (loc == null || loc.getServerName() == null) { 483 manageLocationError(action, null); 484 return null; 485 } 486 return loc; 487 } 488 489 private void manageLocationError(Action action, Exception ex) { 490 String msg = "Cannot get replica " + action.getReplicaId() 491 + " location for " + action.getAction(); 492 LOG.error(msg); 493 if (ex == null) { 494 ex = new IOException(msg); 495 } 496 manageError(action.getOriginalIndex(), action.getAction(), 497 Retry.NO_LOCATION_PROBLEM, ex, null); 498 } 499 500 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { 501 if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id + 502 ", row cannot be null"); 503 RegionLocations loc = null; 504 try { 505 loc = asyncProcess.connection.locateRegion( 506 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId()); 507 } catch (IOException ex) { 508 manageLocationError(action, ex); 509 } 510 return loc; 511 } 512 513 /** 514 * Send a multi action structure to the servers, after a delay depending on the attempt 515 * number. Asynchronous. 516 * 517 * @param actionsByServer the actions structured by regions 518 * @param numAttempt the attempt number. 519 * @param actionsForReplicaThread original actions for replica thread; null on non-first call. 520 */ 521 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, 522 int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) { 523 // Run the last item on the same thread if we are already on a send thread. 524 // We hope most of the time it will be the only item, so we can cut down on threads. 525 int actionsRemaining = actionsByServer.size(); 526 // This iteration is by server (the HRegionLocation comparator is by server portion only). 527 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) { 528 ServerName server = e.getKey(); 529 MultiAction multiAction = e.getValue(); 530 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, 531 numAttempt); 532 // make sure we correctly count the number of runnables before we try to reuse the send 533 // thread, in case we had to split the request into different runnables because of backoff 534 if (runnables.size() > actionsRemaining) { 535 actionsRemaining = runnables.size(); 536 } 537 538 // run all the runnables 539 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow 540 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth 541 for (Runnable runnable : runnables) { 542 if ((--actionsRemaining == 0) && reuseThread 543 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) { 544 runnable.run(); 545 } else { 546 try { 547 pool.submit(runnable); 548 } catch (Throwable t) { 549 if (t instanceof RejectedExecutionException) { 550 // This should never happen. But as the pool is provided by the end user, 551 // let's secure this a little. 552 LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + 553 " Server=" + server.getServerName(), t); 554 } else { 555 // see #HBASE-14359 for more details 556 LOG.warn("Caught unexpected exception/error: ", t); 557 } 558 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 559 // We're likely to fail again, but this will increment the attempt counter, 560 // so it will finish. 561 receiveGlobalFailure(multiAction, server, numAttempt, t); 562 } 563 } 564 } 565 } 566 567 if (actionsForReplicaThread != null) { 568 startWaitingForReplicaCalls(actionsForReplicaThread); 569 } 570 } 571 572 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, 573 MultiAction multiAction, 574 int numAttempt) { 575 // no stats to manage, just do the standard action 576 if (asyncProcess.connection.getStatisticsTracker() == null) { 577 if (asyncProcess.connection.getConnectionMetrics() != null) { 578 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 579 } 580 asyncProcess.incTaskCounters(multiAction.getRegions(), server); 581 SingleServerRequestRunnable runnable = createSingleServerRequest( 582 multiAction, numAttempt, server, callsInProgress); 583 Tracer tracer = Tracer.curThreadTracer(); 584 585 if (tracer == null) { 586 return Collections.singletonList(runnable); 587 } else { 588 return Collections.singletonList(tracer.wrap(runnable, "AsyncProcess.sendMultiAction")); 589 } 590 } 591 592 // group the actions by the amount of delay 593 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size()); 594 595 // split up the actions 596 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) { 597 Long backoff = getBackoff(server, e.getKey()); 598 DelayingRunner runner = actions.get(backoff); 599 if (runner == null) { 600 actions.put(backoff, new DelayingRunner(backoff, e)); 601 } else { 602 runner.add(e); 603 } 604 } 605 606 List<Runnable> toReturn = new ArrayList<>(actions.size()); 607 for (DelayingRunner runner : actions.values()) { 608 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); 609 String traceText = "AsyncProcess.sendMultiAction"; 610 Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); 611 // use a delay runner only if we need to sleep for some time 612 if (runner.getSleepTime() > 0) { 613 runner.setRunner(runnable); 614 traceText = "AsyncProcess.clientBackoff.sendMultiAction"; 615 runnable = runner; 616 if (asyncProcess.connection.getConnectionMetrics() != null) { 617 asyncProcess.connection.getConnectionMetrics().incrDelayRunners(); 618 asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime()); 619 } 620 } else { 621 if (asyncProcess.connection.getConnectionMetrics() != null) { 622 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 623 } 624 } 625 runnable = TraceUtil.wrap(runnable, traceText); 626 toReturn.add(runnable); 627 628 } 629 return toReturn; 630 } 631 632 /** 633 * @param server server location where the target region is hosted 634 * @param regionName name of the region which we are going to write some data 635 * @return the amount of time the client should wait until it submit a request to the 636 * specified server and region 637 */ 638 private Long getBackoff(ServerName server, byte[] regionName) { 639 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); 640 ServerStatistics stats = tracker.getStats(server); 641 return asyncProcess.connection.getBackoffPolicy() 642 .getBackoffTime(server, regionName, stats); 643 } 644 645 /** 646 * Starts waiting to issue replica calls on a different thread; or issues them immediately. 647 */ 648 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) { 649 long startTime = EnvironmentEdgeManager.currentTime(); 650 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( 651 actionsForReplicaThread, startTime); 652 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { 653 // Start replica calls immediately. 654 replicaRunnable.run(); 655 } else { 656 // Start the thread that may kick off replica gets. 657 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. 658 try { 659 pool.submit(replicaRunnable); 660 } catch (RejectedExecutionException ree) { 661 LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree); 662 } 663 } 664 } 665 666 /** 667 * Check that we can retry acts accordingly: logs, set the error status. 668 * 669 * @param originalIndex the position in the list sent 670 * @param row the row 671 * @param canRetry if false, we won't retry whatever the settings. 672 * @param throwable the throwable, if any (can be null) 673 * @param server the location, if any (can be null) 674 * @return true if the action can be retried, false otherwise. 675 */ 676 Retry manageError(int originalIndex, Row row, Retry canRetry, 677 Throwable throwable, ServerName server) { 678 if (canRetry == Retry.YES 679 && throwable != null && throwable instanceof DoNotRetryIOException) { 680 canRetry = Retry.NO_NOT_RETRIABLE; 681 } 682 683 if (canRetry != Retry.YES) { 684 // Batch.Callback<Res> was not called on failure in 0.94. We keep this. 685 setError(originalIndex, row, throwable, server); 686 } else if (isActionComplete(originalIndex, row)) { 687 canRetry = Retry.NO_OTHER_SUCCEEDED; 688 } 689 return canRetry; 690 } 691 692 /** 693 * Resubmit all the actions from this multiaction after a failure. 694 * 695 * @param rsActions the actions still to do from the initial list 696 * @param server the destination 697 * @param numAttempt the number of attempts so far 698 * @param t the throwable (if any) that caused the resubmit 699 */ 700 private void receiveGlobalFailure( 701 MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { 702 errorsByServer.reportServerError(server); 703 Retry canRetry = errorsByServer.canTryMore(numAttempt) 704 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 705 706 cleanServerCache(server, t); 707 int failed = 0; 708 int stopped = 0; 709 List<Action> toReplay = new ArrayList<>(); 710 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) { 711 byte[] regionName = e.getKey(); 712 byte[] row = e.getValue().get(0).getAction().getRow(); 713 // Do not use the exception for updating cache because it might be coming from 714 // any of the regions in the MultiAction. 715 updateCachedLocations(server, regionName, row, 716 ClientExceptionsUtil.isMetaClearingException(t) ? null : t); 717 for (Action action : e.getValue()) { 718 Retry retry = manageError( 719 action.getOriginalIndex(), action.getAction(), canRetry, t, server); 720 if (retry == Retry.YES) { 721 toReplay.add(action); 722 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { 723 ++stopped; 724 } else { 725 ++failed; 726 } 727 } 728 } 729 730 if (toReplay.isEmpty()) { 731 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); 732 } else { 733 resubmit(server, toReplay, numAttempt, rsActions.size(), t); 734 } 735 } 736 737 /** 738 * Log as much info as possible, and, if there is something to replay, 739 * submit it again after a back off sleep. 740 */ 741 private void resubmit(ServerName oldServer, List<Action> toReplay, 742 int numAttempt, int failureCount, Throwable throwable) { 743 // We have something to replay. We're going to sleep a little before. 744 745 // We have two contradicting needs here: 746 // 1) We want to get the new location after having slept, as it may change. 747 // 2) We want to take into account the location when calculating the sleep time. 748 // 3) If all this is just because the response needed to be chunked try again FAST. 749 // It should be possible to have some heuristics to take the right decision. Short term, 750 // we go for one. 751 boolean retryImmediately = throwable instanceof RetryImmediatelyException; 752 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; 753 long backOffTime; 754 if (retryImmediately) { 755 backOffTime = 0; 756 } else if (throwable instanceof CallQueueTooBigException) { 757 // Give a special check on CQTBE, see #HBASE-17114 758 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); 759 } else { 760 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); 761 } 762 if (numAttempt > asyncProcess.startLogErrorsCnt) { 763 // We use this value to have some logs when we have multiple failures, but not too many 764 // logs, as errors are to be expected when a region moves, splits and so on 765 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), 766 oldServer, throwable, backOffTime, true, null, -1, -1)); 767 } 768 769 try { 770 if (backOffTime > 0) { 771 Thread.sleep(backOffTime); 772 } 773 } catch (InterruptedException e) { 774 LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); 775 Thread.currentThread().interrupt(); 776 return; 777 } 778 779 groupAndSendMultiAction(toReplay, nextAttemptNumber); 780 } 781 782 private void logNoResubmit(ServerName oldServer, int numAttempt, 783 int failureCount, Throwable throwable, int failed, int stopped) { 784 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { 785 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); 786 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, 787 throwable, -1, false, timeStr, failed, stopped); 788 if (failed != 0) { 789 // Only log final failures as warning 790 LOG.warn(logMessage); 791 } else { 792 LOG.info(logMessage); 793 } 794 } 795 } 796 797 /** 798 * Called when we receive the result of a server query. 799 * 800 * @param multiAction - the multiAction we sent 801 * @param server - the location. It's used as a server name. 802 * @param responses - the response, if any 803 * @param numAttempt - the attempt 804 */ 805 private void receiveMultiAction(MultiAction multiAction, 806 ServerName server, MultiResponse responses, int numAttempt) { 807 assert responses != null; 808 809 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); 810 updateStats(server, results); 811 812 // Success or partial success 813 // Analyze detailed results. We can still have individual failures to be redo. 814 // two specific throwables are managed: 815 // - DoNotRetryIOException: we continue to retry for other actions 816 // - RegionMovedException: we update the cache with the new region location 817 818 List<Action> toReplay = new ArrayList<>(); 819 Throwable lastException = null; 820 int failureCount = 0; 821 int failed = 0; 822 int stopped = 0; 823 Retry retry = null; 824 // Go by original action. 825 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) { 826 byte[] regionName = regionEntry.getKey(); 827 828 Throwable regionException = responses.getExceptions().get(regionName); 829 if (regionException != null) { 830 cleanServerCache(server, regionException); 831 } 832 833 Map<Integer, Object> regionResults = 834 results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); 835 boolean regionFailureRegistered = false; 836 for (Action sentAction : regionEntry.getValue()) { 837 Object result = regionResults.get(sentAction.getOriginalIndex()); 838 if (result == null) { 839 if (regionException == null) { 840 LOG.error("Server sent us neither results nor exceptions for " 841 + Bytes.toStringBinary(regionName) 842 + ", numAttempt:" + numAttempt); 843 regionException = new RuntimeException("Invalid response"); 844 } 845 // If the row operation encounters the region-lever error, the exception of action may be 846 // null. 847 result = regionException; 848 } 849 // Failure: retry if it's make sense else update the errors lists 850 if (result instanceof Throwable) { 851 Throwable actionException = (Throwable) result; 852 Row row = sentAction.getAction(); 853 lastException = regionException != null ? regionException 854 : ClientExceptionsUtil.findException(actionException); 855 // Register corresponding failures once per server/once per region. 856 if (!regionFailureRegistered) { 857 regionFailureRegistered = true; 858 updateCachedLocations(server, regionName, row.getRow(), actionException); 859 } 860 if (retry == null) { 861 errorsByServer.reportServerError(server); 862 // We determine canRetry only once for all calls, after reporting server failure. 863 retry = errorsByServer.canTryMore(numAttempt) ? 864 Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 865 } 866 ++failureCount; 867 switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, 868 server)) { 869 case YES: 870 toReplay.add(sentAction); 871 break; 872 case NO_OTHER_SUCCEEDED: 873 ++stopped; 874 break; 875 default: 876 ++failed; 877 break; 878 } 879 } else { 880 invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); 881 setResult(sentAction, result); 882 } 883 } 884 } 885 if (toReplay.isEmpty()) { 886 logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); 887 } else { 888 resubmit(server, toReplay, numAttempt, failureCount, lastException); 889 } 890 } 891 892 private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, 893 Throwable rowException) { 894 if (tableName == null) { 895 return; 896 } 897 try { 898 asyncProcess.connection 899 .updateCachedLocations(tableName, regionName, row, rowException, server); 900 } catch (Throwable ex) { 901 // That should never happen, but if it did, we want to make sure 902 // we still process errors 903 LOG.error("Couldn't update cached region locations: " + ex); 904 } 905 } 906 907 private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { 908 if (callback != null) { 909 try { 910 //noinspection unchecked 911 // TODO: would callback expect a replica region name if it gets one? 912 this.callback.update(regionName, row, result); 913 } catch (Throwable t) { 914 LOG.error("User callback threw an exception for " 915 + Bytes.toStringBinary(regionName) + ", ignoring", t); 916 } 917 } 918 } 919 920 private void cleanServerCache(ServerName server, Throwable regionException) { 921 if (ClientExceptionsUtil.isMetaClearingException(regionException)) { 922 // We want to make sure to clear the cache in case there were location-related exceptions. 923 // We don't to clear the cache for every possible exception that comes through, however. 924 asyncProcess.connection.clearCaches(server); 925 } 926 } 927 928 @VisibleForTesting 929 protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { 930 boolean metrics = asyncProcess.connection.getConnectionMetrics() != null; 931 boolean stats = asyncProcess.connection.getStatisticsTracker() != null; 932 if (!stats && !metrics) { 933 return; 934 } 935 for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) { 936 byte[] regionName = regionStats.getKey(); 937 ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat(); 938 if (stat == null) { 939 LOG.error("No ClientProtos.RegionLoadStats found for server=" + server 940 + ", region=" + Bytes.toStringBinary(regionName)); 941 continue; 942 } 943 RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat); 944 ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server, 945 regionName, regionLoadstats); 946 ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(), 947 server, regionName, regionLoadstats); 948 } 949 } 950 951 952 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, 953 Throwable error, long backOffTime, boolean willRetry, String startTime, 954 int failed, int stopped) { 955 StringBuilder sb = new StringBuilder(); 956 sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName). 957 append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries). 958 append(", "); 959 960 if (failureCount > 0 || error != null){ 961 sb.append("failureCount=").append(failureCount).append("ops").append(", last exception="). 962 append(error); 963 } else { 964 sb.append("succeeded"); 965 } 966 967 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); 968 969 if (willRetry) { 970 sb.append(", retrying after=").append(backOffTime).append("ms"). 971 append(", operationsToReplay=").append(replaySize); 972 } else if (failureCount > 0) { 973 if (stopped > 0) { 974 sb.append("; NOT retrying, stopped=").append(stopped). 975 append(" because successful operation on other replica"); 976 } 977 if (failed > 0) { 978 sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!"); 979 } 980 } 981 982 return sb.toString(); 983 } 984 985 /** 986 * Sets the non-error result from a particular action. 987 * @param action Action (request) that the server responded to. 988 * @param result The result. 989 */ 990 private void setResult(Action action, Object result) { 991 if (result == null) { 992 throw new RuntimeException("Result cannot be null"); 993 } 994 ReplicaResultState state = null; 995 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 996 int index = action.getOriginalIndex(); 997 if (results == null) { 998 decActionCounter(index); 999 return; // Simple case, no replica requests. 1000 } 1001 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale); 1002 if (state == null) { 1003 return; // Simple case, no replica requests. 1004 } 1005 // At this point we know that state is set to replica tracking class. 1006 // It could be that someone else is also looking at it; however, we know there can 1007 // only be one state object, and only one thread can set callCount to 0. Other threads 1008 // will either see state with callCount 0 after locking it; or will not see state at all 1009 // we will replace it with the result. 1010 synchronized (state) { 1011 if (state.callCount == 0) { 1012 return; // someone already set the result 1013 } 1014 state.callCount = 0; 1015 } 1016 synchronized (replicaResultLock) { 1017 if (results[index] != state) { 1018 throw new AssertionError("We set the callCount but someone else replaced the result"); 1019 } 1020 updateResult(index, result); 1021 } 1022 1023 decActionCounter(index); 1024 } 1025 1026 /** 1027 * Sets the error from a particular action. 1028 * @param index Original action index. 1029 * @param row Original request. 1030 * @param throwable The resulting error. 1031 * @param server The source server. 1032 */ 1033 private void setError(int index, Row row, Throwable throwable, ServerName server) { 1034 ReplicaResultState state = null; 1035 if (results == null) { 1036 // Note that we currently cannot have replica requests with null results. So it shouldn't 1037 // happen that multiple replica calls will call dAC for same actions with results == null. 1038 // Only one call per action should be present in this case. 1039 errors.add(throwable, row, server); 1040 decActionCounter(index); 1041 return; // Simple case, no replica requests. 1042 } 1043 state = trySetResultSimple(index, row, true, throwable, server, false); 1044 if (state == null) { 1045 return; // Simple case, no replica requests. 1046 } 1047 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. 1048 boolean isActionDone = false; 1049 synchronized (state) { 1050 switch (state.callCount) { 1051 case 0: return; // someone already set the result 1052 case 1: { // All calls failed, we are the last error. 1053 target = errors; 1054 isActionDone = true; 1055 break; 1056 } 1057 default: { 1058 assert state.callCount > 1; 1059 if (state.replicaErrors == null) { 1060 state.replicaErrors = new BatchErrors(); 1061 } 1062 target = state.replicaErrors; 1063 break; 1064 } 1065 } 1066 --state.callCount; 1067 } 1068 target.add(throwable, row, server); 1069 if (isActionDone) { 1070 if (state.replicaErrors != null) { // last call, no need to lock 1071 errors.merge(state.replicaErrors); 1072 } 1073 // See setResult for explanations. 1074 synchronized (replicaResultLock) { 1075 if (results[index] != state) { 1076 throw new AssertionError("We set the callCount but someone else replaced the result"); 1077 } 1078 updateResult(index, throwable); 1079 } 1080 decActionCounter(index); 1081 } 1082 } 1083 1084 /** 1085 * Checks if the action is complete; used on error to prevent needless retries. 1086 * Does not synchronize, assuming element index/field accesses are atomic. 1087 * This is an opportunistic optimization check, doesn't have to be strict. 1088 * @param index Original action index. 1089 * @param row Original request. 1090 */ 1091 private boolean isActionComplete(int index, Row row) { 1092 if (!AsyncProcess.isReplicaGet(row)) return false; 1093 Object resObj = results[index]; 1094 return (resObj != null) && (!(resObj instanceof ReplicaResultState) 1095 || ((ReplicaResultState)resObj).callCount == 0); 1096 } 1097 1098 /** 1099 * Tries to set the result or error for a particular action as if there were no replica calls. 1100 * @return null if successful; replica state if there were in fact replica calls. 1101 */ 1102 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, 1103 Object result, ServerName server, boolean isFromReplica) { 1104 Object resObj = null; 1105 if (!AsyncProcess.isReplicaGet(row)) { 1106 if (isFromReplica) { 1107 throw new AssertionError("Unexpected stale result for " + row); 1108 } 1109 updateResult(index, result); 1110 } else { 1111 synchronized (replicaResultLock) { 1112 resObj = results[index]; 1113 if (resObj == null) { 1114 if (isFromReplica) { 1115 throw new AssertionError("Unexpected stale result for " + row); 1116 } 1117 updateResult(index, result); 1118 } 1119 } 1120 } 1121 1122 ReplicaResultState rrs = 1123 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null; 1124 if (rrs == null && isError) { 1125 // The resObj is not replica state (null or already set). 1126 errors.add((Throwable)result, row, server); 1127 } 1128 1129 if (resObj == null) { 1130 // resObj is null - no replica calls were made. 1131 decActionCounter(index); 1132 return null; 1133 } 1134 return rrs; 1135 } 1136 1137 private void decActionCounter(int index) { 1138 long actionsRemaining = actionsInProgress.decrementAndGet(); 1139 if (actionsRemaining < 0) { 1140 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); 1141 throw new AssertionError(error); 1142 } else if (actionsRemaining == 0) { 1143 synchronized (actionsInProgress) { 1144 actionsInProgress.notifyAll(); 1145 } 1146 } 1147 } 1148 1149 private String buildDetailedErrorMsg(String string, int index) { 1150 StringBuilder error = new StringBuilder(128); 1151 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") 1152 .append(actionsInProgress.get()).append("; replica gets: "); 1153 if (replicaGetIndices != null) { 1154 for (int i = 0; i < replicaGetIndices.length; ++i) { 1155 error.append(replicaGetIndices[i]).append(", "); 1156 } 1157 } else { 1158 error.append(hasAnyReplicaGets ? "all" : "none"); 1159 } 1160 error.append("; results "); 1161 if (results != null) { 1162 for (int i = 0; i < results.length; ++i) { 1163 Object o = results[i]; 1164 error.append(((o == null) ? "null" : o.toString())).append(", "); 1165 } 1166 } 1167 return error.toString(); 1168 } 1169 1170 @Override 1171 public void waitUntilDone() throws InterruptedIOException { 1172 try { 1173 waitUntilDone(Long.MAX_VALUE); 1174 } catch (InterruptedException iex) { 1175 throw new InterruptedIOException(iex.getMessage()); 1176 } finally { 1177 if (callsInProgress != null) { 1178 for (CancellableRegionServerCallable clb : callsInProgress) { 1179 clb.cancel(); 1180 } 1181 } 1182 } 1183 } 1184 1185 private boolean waitUntilDone(long cutoff) throws InterruptedException { 1186 boolean hasWait = cutoff != Long.MAX_VALUE; 1187 long lastLog = EnvironmentEdgeManager.currentTime(); 1188 long currentInProgress; 1189 while (0 != (currentInProgress = actionsInProgress.get())) { 1190 long now = EnvironmentEdgeManager.currentTime(); 1191 if (hasWait && (now * 1000L) > cutoff) { 1192 return false; 1193 } 1194 if (!hasWait) { // Only log if wait is infinite. 1195 if (now > lastLog + 10000) { 1196 lastLog = now; 1197 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress 1198 + " actions to finish on table: " + tableName); 1199 } 1200 } 1201 synchronized (actionsInProgress) { 1202 if (actionsInProgress.get() == 0) break; 1203 if (!hasWait) { 1204 actionsInProgress.wait(10); 1205 } else { 1206 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); 1207 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); 1208 } 1209 } 1210 } 1211 return true; 1212 } 1213 1214 @Override 1215 public boolean hasError() { 1216 return errors.hasErrors(); 1217 } 1218 1219 @Override 1220 public List<? extends Row> getFailedOperations() { 1221 return errors.actions; 1222 } 1223 1224 @Override 1225 public RetriesExhaustedWithDetailsException getErrors() { 1226 return errors.makeException(asyncProcess.logBatchErrorDetails); 1227 } 1228 1229 @Override 1230 public Object[] getResults() throws InterruptedIOException { 1231 waitUntilDone(); 1232 return results; 1233 } 1234 1235 /** 1236 * Creates the server error tracker to use inside process. 1237 * Currently, to preserve the main assumption about current retries, and to work well with 1238 * the retry-limit-based calculation, the calculation is local per Process object. 1239 * We may benefit from connection-wide tracking of server errors. 1240 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 1241 */ 1242 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 1243 return new ConnectionImplementation.ServerErrorTracker( 1244 asyncProcess.serverTrackerTimeout, asyncProcess.numTries); 1245 } 1246 1247 /** 1248 * Create a callable. Isolated to be easily overridden in the tests. 1249 */ 1250 private MultiServerCallable createCallable(final ServerName server, TableName tableName, 1251 final MultiAction multi) { 1252 return new MultiServerCallable(asyncProcess.connection, tableName, server, 1253 multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); 1254 } 1255 1256 private void updateResult(int index, Object result) { 1257 Object current = results[index]; 1258 if (current != null) { 1259 if (LOG.isDebugEnabled()) { 1260 LOG.debug("The result is assigned repeatedly! current:" + current 1261 + ", new:" + result); 1262 } 1263 } 1264 results[index] = result; 1265 } 1266 1267 @VisibleForTesting 1268 long getNumberOfActionsInProgress() { 1269 return actionsInProgress.get(); 1270 } 1271}