001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.SocketTimeoutException; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.Date; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.Set; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.RejectedExecutionException; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HBaseServerException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.RegionLocations; 042import org.apache.hadoop.hbase.RetryImmediatelyException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 046import org.apache.hadoop.hbase.client.coprocessor.Batch; 047import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.yetus.audience.InterfaceAudience; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * The context, and return value, for a single submit/submitAll call. Note on how this class (one AP 056 * submit) works. Initially, all requests are split into groups by server; request is sent to each 057 * server in parallel; the RPC calls are not async so a thread per server is used. Every time some 058 * actions fail, regions/locations might have changed, so we re-group them by server and region 059 * again and send these groups in parallel too. The result, in case of retries, is a "tree" of 060 * threads, with parent exiting after scheduling children. This is why lots of code doesn't require 061 * any synchronization. 062 */ 063@InterfaceAudience.Private 064class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { 065 066 private static final Logger LOG = LoggerFactory.getLogger(AsyncRequestFutureImpl.class); 067 068 private RetryingTimeTracker tracker; 069 070 /** 071 * Runnable (that can be submitted to thread pool) that waits for when it's time to issue replica 072 * calls, finds region replicas, groups the requests by replica and issues the calls (on separate 073 * threads, via sendMultiAction). This is done on a separate thread because we don't want to wait 074 * on user thread for our asynchronous call, and usually we have to wait before making replica 075 * calls. 076 */ 077 private final class ReplicaCallIssuingRunnable implements Runnable { 078 private final long startTime; 079 private final List<Action> initialActions; 080 081 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) { 082 this.initialActions = initialActions; 083 this.startTime = startTime; 084 } 085 086 @Override 087 public void run() { 088 boolean done = false; 089 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) { 090 try { 091 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds); 092 } catch (InterruptedException ex) { 093 LOG.error("Replica thread interrupted - no replica calls {}", ex.getMessage()); 094 return; 095 } 096 } 097 if (done) return; // Done within primary timeout 098 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 099 List<Action> unknownLocActions = new ArrayList<>(); 100 if (replicaGetIndices == null) { 101 for (int i = 0; i < results.length; ++i) { 102 addReplicaActions(i, actionsByServer, unknownLocActions); 103 } 104 } else { 105 for (int replicaGetIndice : replicaGetIndices) { 106 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions); 107 } 108 } 109 if (!actionsByServer.isEmpty()) { 110 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); 111 } 112 if (!unknownLocActions.isEmpty()) { 113 actionsByServer = new HashMap<>(); 114 for (Action action : unknownLocActions) { 115 addReplicaActionsAgain(action, actionsByServer); 116 } 117 // Some actions may have completely failed, they are handled inside addAgain. 118 if (!actionsByServer.isEmpty()) { 119 sendMultiAction(actionsByServer, 1, null, true); 120 } 121 } 122 } 123 124 /** 125 * Add replica actions to action map by server. 126 * @param index Index of the original action. 127 * @param actionsByServer The map by server to add it to. 128 */ 129 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer, 130 List<Action> unknownReplicaActions) { 131 if (results[index] != null) return; // opportunistic. Never goes from non-null to null. 132 Action action = initialActions.get(index); 133 RegionLocations loc = findAllLocationsOrFail(action, true); 134 if (loc == null) return; 135 HRegionLocation[] locs = loc.getRegionLocations(); 136 if (locs.length == 1) { 137 LOG.warn("No replicas found for {}", action.getAction()); 138 return; 139 } 140 synchronized (replicaResultLock) { 141 // Don't run replica calls if the original has finished. We could do it e.g. if 142 // original has already failed before first replica call (unlikely given retries), 143 // but that would require additional synchronization w.r.t. returning to caller. 144 if (results[index] != null) return; 145 // We set the number of calls here. After that any path must call setResult/setError. 146 // True even for replicas that are not found - if we refuse to send we MUST set error. 147 updateResult(index, new ReplicaResultState(locs.length)); 148 } 149 for (int i = 1; i < locs.length; ++i) { 150 Action replicaAction = new Action(action, i); 151 if (locs[i] != null) { 152 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), 153 replicaAction, actionsByServer, nonceGroup); 154 } else { 155 unknownReplicaActions.add(replicaAction); 156 } 157 } 158 } 159 160 private void addReplicaActionsAgain(Action action, 161 Map<ServerName, MultiAction> actionsByServer) { 162 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { 163 throw new AssertionError("Cannot have default replica here"); 164 } 165 HRegionLocation loc = getReplicaLocationOrFail(action); 166 if (loc == null) return; 167 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(), action, 168 actionsByServer, nonceGroup); 169 } 170 } 171 172 /** 173 * Runnable (that can be submitted to thread pool) that submits MultiAction to a single server. 174 * The server call is synchronous, therefore we do it on a thread pool. 175 */ 176 final class SingleServerRequestRunnable implements Runnable { 177 private final MultiAction multiAction; 178 private final int numAttempt; 179 private final ServerName server; 180 private final Set<CancellableRegionServerCallable> callsInProgress; 181 182 SingleServerRequestRunnable(MultiAction multiAction, int numAttempt, ServerName server, 183 Set<CancellableRegionServerCallable> callsInProgress) { 184 this.multiAction = multiAction; 185 this.numAttempt = numAttempt; 186 this.server = server; 187 this.callsInProgress = callsInProgress; 188 } 189 190 @Override 191 public void run() { 192 AbstractResponse res = null; 193 CancellableRegionServerCallable callable = currentCallable; 194 try { 195 // setup the callable based on the actions, if we don't have one already from the request 196 if (callable == null) { 197 callable = createCallable(server, tableName, multiAction); 198 } 199 RpcRetryingCaller<AbstractResponse> caller = 200 asyncProcess.createCaller(callable, rpcTimeout); 201 try { 202 if (callsInProgress != null) { 203 callsInProgress.add(callable); 204 } 205 res = caller.callWithoutRetries(callable, operationTimeout); 206 if (res == null) { 207 // Cancelled 208 return; 209 } 210 } catch (IOException e) { 211 // The service itself failed . It may be an error coming from the communication 212 // layer, but, as well, a functional error raised by the server. 213 receiveGlobalFailure(multiAction, server, numAttempt, e); 214 return; 215 } catch (Throwable t) { 216 // This should not happen. Let's log & retry anyway. 217 LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected." 218 + " Retrying. Server=" + server + ", tableName=" + tableName, t); 219 receiveGlobalFailure(multiAction, server, numAttempt, t); 220 return; 221 } 222 if (res.type() == AbstractResponse.ResponseType.MULTI) { 223 // Normal case: we received an answer from the server, and it's not an exception. 224 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt); 225 } else { 226 if (results != null) { 227 SingleResponse singleResponse = (SingleResponse) res; 228 updateResult(0, singleResponse.getEntry()); 229 } 230 decActionCounter(1); 231 } 232 } catch (Throwable t) { 233 // Something really bad happened. We are on the send thread that will now die. 234 LOG.error("id=" + asyncProcess.id + " error for " + tableName + " processing " + server, t); 235 throw new RuntimeException(t); 236 } finally { 237 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 238 if (callsInProgress != null && callable != null && res != null) { 239 callsInProgress.remove(callable); 240 } 241 } 242 } 243 } 244 245 private final Batch.Callback<CResult> callback; 246 private final BatchErrors errors; 247 private final ConnectionImplementation.ServerErrorTracker errorsByServer; 248 private final ExecutorService pool; 249 private final Set<CancellableRegionServerCallable> callsInProgress; 250 251 private final TableName tableName; 252 private final AtomicLong actionsInProgress = new AtomicLong(-1); 253 /** 254 * The lock controls access to results. It is only held when populating results where there might 255 * be several callers (eventual consistency gets). For other requests, there's one unique call 256 * going on per result index. 257 */ 258 private final Object replicaResultLock = new Object(); 259 /** 260 * Result array. Null if results are not needed. Otherwise, each index corresponds to the action 261 * index in initial actions submitted. For most request types, has null-s for requests that are 262 * not done, and result/exception for those that are done. For eventual-consistency gets, 263 * initially the same applies; at some point, replica calls might be started, and 264 * ReplicaResultState is put at the corresponding indices. The returning calls check the type to 265 * detect when this is the case. After all calls are done, ReplicaResultState-s are replaced with 266 * results for the user. 267 */ 268 private final Object[] results; 269 /** 270 * Indices of replica gets in results. If null, all or no actions are replica-gets. 271 */ 272 private final int[] replicaGetIndices; 273 private final boolean hasAnyReplicaGets; 274 private final long nonceGroup; 275 private final CancellableRegionServerCallable currentCallable; 276 private final int operationTimeout; 277 private final int rpcTimeout; 278 private final AsyncProcess asyncProcess; 279 280 /** 281 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only 282 * used to make logging more clear, we don't actually care why we don't retry. 283 */ 284 public enum Retry { 285 YES, 286 NO_LOCATION_PROBLEM, 287 NO_NOT_RETRIABLE, 288 NO_RETRIES_EXHAUSTED, 289 NO_OTHER_SUCCEEDED 290 } 291 292 /** 293 * Sync point for calls to multiple replicas for the same user request (Get). Created and put in 294 * the results array (we assume replica calls require results) when the replica calls are 295 * launched. See results for details of this process. POJO, all fields are public. To modify them, 296 * the object itself is locked. 297 */ 298 private static class ReplicaResultState { 299 public ReplicaResultState(int callCount) { 300 this.callCount = callCount; 301 } 302 303 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */ 304 int callCount; 305 /** 306 * Errors for which it is not decided whether we will report them to user. If one of the calls 307 * succeeds, we will discard the errors that may have happened in the other calls. 308 */ 309 BatchErrors replicaErrors = null; 310 311 @Override 312 public String toString() { 313 return "[call count " + callCount + "; errors " + replicaErrors + "]"; 314 } 315 } 316 317 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, 318 AsyncProcess asyncProcess) { 319 this.pool = task.getPool(); 320 this.callback = task.getCallback(); 321 this.nonceGroup = nonceGroup; 322 this.tableName = task.getTableName(); 323 this.actionsInProgress.set(actions.size()); 324 if (task.getResults() == null) { 325 results = task.getNeedResults() ? new Object[actions.size()] : null; 326 } else { 327 if (task.getResults().length != actions.size()) { 328 throw new AssertionError("results.length"); 329 } 330 this.results = task.getResults(); 331 for (int i = 0; i != this.results.length; ++i) { 332 results[i] = null; 333 } 334 } 335 List<Integer> replicaGetIndices = null; 336 boolean hasAnyReplicaGets = false; 337 if (results != null) { 338 // Check to see if any requests might require replica calls. 339 // We expect that many requests will consist of all or no multi-replica gets; in such 340 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will 341 // store the list of action indexes for which replica gets are possible, and set 342 // hasAnyReplicaGets to true. 343 boolean hasAnyNonReplicaReqs = false; 344 int posInList = 0; 345 for (Action action : actions) { 346 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); 347 if (isReplicaGet) { 348 hasAnyReplicaGets = true; 349 if (hasAnyNonReplicaReqs) { // Mixed case 350 if (replicaGetIndices == null) { 351 replicaGetIndices = new ArrayList<>(actions.size() - 1); 352 } 353 replicaGetIndices.add(posInList); 354 } 355 } else if (!hasAnyNonReplicaReqs) { 356 // The first non-multi-replica request in the action list. 357 hasAnyNonReplicaReqs = true; 358 if (posInList > 0) { 359 // Add all the previous requests to the index lists. We know they are all 360 // replica-gets because this is the first non-multi-replica request in the list. 361 replicaGetIndices = new ArrayList<>(actions.size() - 1); 362 for (int i = 0; i < posInList; ++i) { 363 replicaGetIndices.add(i); 364 } 365 } 366 } 367 ++posInList; 368 } 369 } 370 this.hasAnyReplicaGets = hasAnyReplicaGets; 371 if (replicaGetIndices != null) { 372 this.replicaGetIndices = new int[replicaGetIndices.size()]; 373 int i = 0; 374 for (Integer el : replicaGetIndices) { 375 this.replicaGetIndices[i++] = el; 376 } 377 } else { 378 this.replicaGetIndices = null; 379 } 380 this.callsInProgress = !hasAnyReplicaGets 381 ? null 382 : Collections 383 .newSetFromMap(new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>()); 384 this.asyncProcess = asyncProcess; 385 this.errorsByServer = createServerErrorTracker(); 386 this.errors = new BatchErrors(); 387 this.operationTimeout = task.getOperationTimeout(); 388 this.rpcTimeout = task.getRpcTimeout(); 389 this.currentCallable = task.getCallable(); 390 if (task.getCallable() == null) { 391 tracker = new RetryingTimeTracker().start(); 392 } 393 } 394 395 protected Set<CancellableRegionServerCallable> getCallsInProgress() { 396 return callsInProgress; 397 } 398 399 SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, 400 ServerName server, Set<CancellableRegionServerCallable> callsInProgress) { 401 return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); 402 } 403 404 /** 405 * Group a list of actions per region servers, and send them. 406 * @param currentActions - the list of row to submit 407 * @param numAttempt - the current numAttempt (first attempt is 1) 408 */ 409 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) { 410 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 411 412 boolean isReplica = false; 413 List<Action> unknownReplicaActions = null; 414 for (Action action : currentActions) { 415 RegionLocations locs = findAllLocationsOrFail(action, true); 416 if (locs == null) continue; 417 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 418 if (isReplica && !isReplicaAction) { 419 // This is the property of the current implementation, not a requirement. 420 throw new AssertionError("Replica and non-replica actions in the same retry"); 421 } 422 isReplica = isReplicaAction; 423 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId()); 424 if (loc == null || loc.getServerName() == null) { 425 if (isReplica) { 426 if (unknownReplicaActions == null) { 427 unknownReplicaActions = new ArrayList<>(1); 428 } 429 unknownReplicaActions.add(action); 430 } else { 431 // TODO: relies on primary location always being fetched 432 manageLocationError(action, null); 433 } 434 } else { 435 byte[] regionName = loc.getRegionInfo().getRegionName(); 436 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, 437 nonceGroup); 438 } 439 } 440 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets); 441 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty(); 442 443 if (!actionsByServer.isEmpty()) { 444 // If this is a first attempt to group and send, no replicas, we need replica thread. 445 sendMultiAction(actionsByServer, numAttempt, 446 (doStartReplica && !hasUnknown) ? currentActions : null, numAttempt > 1 && !hasUnknown); 447 } 448 449 if (hasUnknown) { 450 actionsByServer = new HashMap<>(); 451 for (Action action : unknownReplicaActions) { 452 HRegionLocation loc = getReplicaLocationOrFail(action); 453 if (loc == null) continue; 454 byte[] regionName = loc.getRegionInfo().getRegionName(); 455 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, 456 nonceGroup); 457 } 458 if (!actionsByServer.isEmpty()) { 459 sendMultiAction(actionsByServer, numAttempt, doStartReplica ? currentActions : null, true); 460 } 461 } 462 } 463 464 private HRegionLocation getReplicaLocationOrFail(Action action) { 465 // We are going to try get location once again. For each action, we'll do it once 466 // from cache, because the previous calls in the loop might populate it. 467 int replicaId = action.getReplicaId(); 468 RegionLocations locs = findAllLocationsOrFail(action, true); 469 if (locs == null) return null; // manageError already called 470 HRegionLocation loc = locs.getRegionLocation(replicaId); 471 if (loc == null || loc.getServerName() == null) { 472 locs = findAllLocationsOrFail(action, false); 473 if (locs == null) return null; // manageError already called 474 loc = locs.getRegionLocation(replicaId); 475 } 476 if (loc == null || loc.getServerName() == null) { 477 manageLocationError(action, null); 478 return null; 479 } 480 return loc; 481 } 482 483 private void manageLocationError(Action action, Exception ex) { 484 String msg = 485 "Cannot get replica " + action.getReplicaId() + " location for " + action.getAction(); 486 LOG.error(msg); 487 if (ex == null) { 488 ex = new IOException(msg); 489 } 490 manageError(action.getOriginalIndex(), action.getAction(), Retry.NO_LOCATION_PROBLEM, ex, null); 491 } 492 493 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { 494 if (action.getAction() == null) 495 throw new IllegalArgumentException("#" + asyncProcess.id + ", row cannot be null"); 496 RegionLocations loc = null; 497 try { 498 loc = asyncProcess.connection.locateRegion(tableName, action.getAction().getRow(), useCache, 499 true, action.getReplicaId()); 500 } catch (IOException ex) { 501 manageLocationError(action, ex); 502 } 503 return loc; 504 } 505 506 /** 507 * Send a multi action structure to the servers, after a delay depending on the attempt number. 508 * Asynchronous. 509 * @param actionsByServer the actions structured by regions 510 * @param numAttempt the attempt number. 511 * @param actionsForReplicaThread original actions for replica thread; null on non-first call. 512 */ 513 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt, 514 List<Action> actionsForReplicaThread, boolean reuseThread) { 515 // Run the last item on the same thread if we are already on a send thread. 516 // We hope most of the time it will be the only item, so we can cut down on threads. 517 int actionsRemaining = actionsByServer.size(); 518 // This iteration is by server (the HRegionLocation comparator is by server portion only). 519 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) { 520 ServerName server = e.getKey(); 521 MultiAction multiAction = e.getValue(); 522 Collection<? extends Runnable> runnables = 523 getNewMultiActionRunnable(server, multiAction, numAttempt); 524 // make sure we correctly count the number of runnables before we try to reuse the send 525 // thread, in case we had to split the request into different runnables because of backoff 526 if (runnables.size() > actionsRemaining) { 527 actionsRemaining = runnables.size(); 528 } 529 530 // run all the runnables 531 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack 532 // overflow 533 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth 534 for (Runnable runnable : runnables) { 535 if ( 536 (--actionsRemaining == 0) && reuseThread 537 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0 538 ) { 539 runnable.run(); 540 } else { 541 try { 542 pool.execute(runnable); 543 } catch (Throwable t) { 544 if (t instanceof RejectedExecutionException) { 545 // This should never happen. But as the pool is provided by the end user, 546 // let's secure this a little. 547 LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server=" 548 + server.getServerName(), t); 549 } else { 550 // see #HBASE-14359 for more details 551 LOG.warn("Caught unexpected exception/error: ", t); 552 } 553 asyncProcess.decTaskCounters(multiAction.getRegions(), server); 554 // We're likely to fail again, but this will increment the attempt counter, 555 // so it will finish. 556 receiveGlobalFailure(multiAction, server, numAttempt, t); 557 } 558 } 559 } 560 } 561 562 if (actionsForReplicaThread != null) { 563 startWaitingForReplicaCalls(actionsForReplicaThread); 564 } 565 } 566 567 @SuppressWarnings("MixedMutabilityReturnType") 568 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server, 569 MultiAction multiAction, int numAttempt) { 570 // no stats to manage, just do the standard action 571 if (asyncProcess.connection.getStatisticsTracker() == null) { 572 if (asyncProcess.connection.getConnectionMetrics() != null) { 573 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 574 } 575 asyncProcess.incTaskCounters(multiAction.getRegions(), server); 576 SingleServerRequestRunnable runnable = 577 createSingleServerRequest(multiAction, numAttempt, server, callsInProgress); 578 579 // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable 580 return Collections.singletonList(runnable); 581 } 582 583 // group the actions by the amount of delay 584 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size()); 585 586 // split up the actions 587 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) { 588 Long backoff = getBackoff(server, e.getKey()); 589 DelayingRunner runner = actions.get(backoff); 590 if (runner == null) { 591 actions.put(backoff, new DelayingRunner(backoff, e)); 592 } else { 593 runner.add(e); 594 } 595 } 596 597 List<Runnable> toReturn = new ArrayList<>(actions.size()); 598 for (DelayingRunner runner : actions.values()) { 599 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); 600 Runnable runnable = 601 createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); 602 // use a delay runner only if we need to sleep for some time 603 if (runner.getSleepTime() > 0) { 604 runner.setRunner(runnable); 605 runnable = runner; 606 if (asyncProcess.connection.getConnectionMetrics() != null) { 607 asyncProcess.connection.getConnectionMetrics() 608 .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime()); 609 } 610 } else { 611 if (asyncProcess.connection.getConnectionMetrics() != null) { 612 asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); 613 } 614 } 615 // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable 616 toReturn.add(runnable); 617 618 } 619 return toReturn; 620 } 621 622 /** 623 * @param server server location where the target region is hosted 624 * @param regionName name of the region which we are going to write some data 625 * @return the amount of time the client should wait until it submit a request to the specified 626 * server and region 627 */ 628 private Long getBackoff(ServerName server, byte[] regionName) { 629 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker(); 630 ServerStatistics stats = tracker.getStats(server); 631 return asyncProcess.connection.getBackoffPolicy().getBackoffTime(server, regionName, stats); 632 } 633 634 /** 635 * Starts waiting to issue replica calls on a different thread; or issues them immediately. 636 */ 637 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) { 638 long startTime = EnvironmentEdgeManager.currentTime(); 639 ReplicaCallIssuingRunnable replicaRunnable = 640 new ReplicaCallIssuingRunnable(actionsForReplicaThread, startTime); 641 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) { 642 // Start replica calls immediately. 643 replicaRunnable.run(); 644 } else { 645 // Start the thread that may kick off replica gets. 646 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea. 647 try { 648 pool.execute(replicaRunnable); 649 } catch (RejectedExecutionException ree) { 650 LOG.warn("id=" + asyncProcess.id + " replica task rejected by pool; no replica calls", ree); 651 } 652 } 653 } 654 655 /** 656 * Check that we can retry acts accordingly: logs, set the error status. 657 * @param originalIndex the position in the list sent 658 * @param row the row 659 * @param canRetry if false, we won't retry whatever the settings. 660 * @param throwable the throwable, if any (can be null) 661 * @param server the location, if any (can be null) 662 * @return true if the action can be retried, false otherwise. 663 */ 664 Retry manageError(int originalIndex, Row row, Retry canRetry, Throwable throwable, 665 ServerName server) { 666 if (canRetry == Retry.YES && throwable != null && throwable instanceof DoNotRetryIOException) { 667 canRetry = Retry.NO_NOT_RETRIABLE; 668 } 669 670 if (canRetry != Retry.YES) { 671 // Batch.Callback<Res> was not called on failure in 0.94. We keep this. 672 setError(originalIndex, row, throwable, server); 673 } else if (isActionComplete(originalIndex, row)) { 674 canRetry = Retry.NO_OTHER_SUCCEEDED; 675 } 676 return canRetry; 677 } 678 679 /** 680 * Resubmit all the actions from this multiaction after a failure. 681 * @param rsActions the actions still to do from the initial list 682 * @param server the destination 683 * @param numAttempt the number of attempts so far 684 * @param t the throwable (if any) that caused the resubmit 685 */ 686 private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt, 687 Throwable t) { 688 errorsByServer.reportServerError(server); 689 Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 690 691 cleanServerCache(server, t); 692 int failed = 0; 693 int stopped = 0; 694 List<Action> toReplay = new ArrayList<>(); 695 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) { 696 byte[] regionName = e.getKey(); 697 byte[] row = e.getValue().get(0).getAction().getRow(); 698 // Do not use the exception for updating cache because it might be coming from 699 // any of the regions in the MultiAction. 700 updateCachedLocations(server, regionName, row, 701 ClientExceptionsUtil.isMetaClearingException(t) ? null : t); 702 for (Action action : e.getValue()) { 703 Retry retry = 704 manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server); 705 if (retry == Retry.YES) { 706 toReplay.add(action); 707 } else if (retry == Retry.NO_OTHER_SUCCEEDED) { 708 ++stopped; 709 } else { 710 ++failed; 711 } 712 } 713 } 714 715 if (toReplay.isEmpty()) { 716 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped); 717 } else { 718 resubmit(server, toReplay, numAttempt, rsActions.size(), t); 719 } 720 } 721 722 /** 723 * Log as much info as possible, and, if there is something to replay, submit it again after a 724 * back off sleep. 725 */ 726 private void resubmit(ServerName oldServer, List<Action> toReplay, int numAttempt, 727 int failureCount, Throwable throwable) { 728 // We have something to replay. We're going to sleep a little before. 729 730 // We have two contradicting needs here: 731 // 1) We want to get the new location after having slept, as it may change. 732 // 2) We want to take into account the location when calculating the sleep time. 733 // 3) If all this is just because the response needed to be chunked try again FAST. 734 // It should be possible to have some heuristics to take the right decision. Short term, 735 // we go for one. 736 boolean retryImmediately = throwable instanceof RetryImmediatelyException; 737 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; 738 long backOffTime; 739 if (retryImmediately) { 740 backOffTime = 0; 741 } else if (HBaseServerException.isServerOverloaded(throwable)) { 742 // Give a special check when encountering an exception indicating the server is overloaded. 743 // see #HBASE-17114 and HBASE-26807 744 backOffTime = errorsByServer.calculateBackoffTime(oldServer, 745 asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded()); 746 } else { 747 backOffTime = errorsByServer.calculateBackoffTime(oldServer, 748 asyncProcess.connectionConfiguration.getPauseMillis()); 749 } 750 if (numAttempt > asyncProcess.startLogErrorsCnt) { 751 // We use this value to have some logs when we have multiple failures, but not too many 752 // logs, as errors are to be expected when a region moves, splits and so on 753 LOG.info(createLog(numAttempt, failureCount, toReplay.size(), oldServer, throwable, 754 backOffTime, true, null, -1, -1)); 755 } 756 757 try { 758 if (backOffTime > 0) { 759 Thread.sleep(backOffTime); 760 } 761 } catch (InterruptedException e) { 762 LOG.warn( 763 "#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e); 764 Thread.currentThread().interrupt(); 765 return; 766 } 767 768 groupAndSendMultiAction(toReplay, nextAttemptNumber); 769 } 770 771 private void logNoResubmit(ServerName oldServer, int numAttempt, int failureCount, 772 Throwable throwable, int failed, int stopped) { 773 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) { 774 @SuppressWarnings("JavaUtilDate") 775 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString(); 776 String logMessage = createLog(numAttempt, failureCount, 0, oldServer, throwable, -1, false, 777 timeStr, failed, stopped); 778 if (failed != 0) { 779 // Only log final failures as warning 780 LOG.warn(logMessage); 781 } else { 782 LOG.info(logMessage); 783 } 784 } 785 } 786 787 /** 788 * Called when we receive the result of a server query. 789 * @param multiAction - the multiAction we sent 790 * @param server - the location. It's used as a server name. 791 * @param responses - the response, if any 792 * @param numAttempt - the attempt 793 */ 794 private void receiveMultiAction(MultiAction multiAction, ServerName server, 795 MultiResponse responses, int numAttempt) { 796 assert responses != null; 797 updateStats(server, responses); 798 // Success or partial success 799 // Analyze detailed results. We can still have individual failures to be redo. 800 // two specific throwables are managed: 801 // - DoNotRetryIOException: we continue to retry for other actions 802 // - RegionMovedException: we update the cache with the new region location 803 Map<byte[], MultiResponse.RegionResult> results = responses.getResults(); 804 List<Action> toReplay = new ArrayList<>(); 805 Throwable lastException = null; 806 int failureCount = 0; 807 int failed = 0; 808 int stopped = 0; 809 Retry retry = null; 810 // Go by original action. 811 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) { 812 byte[] regionName = regionEntry.getKey(); 813 814 Throwable regionException = responses.getExceptions().get(regionName); 815 if (regionException != null) { 816 cleanServerCache(server, regionException); 817 } 818 819 Map<Integer, Object> regionResults = 820 results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap(); 821 boolean regionFailureRegistered = false; 822 for (Action sentAction : regionEntry.getValue()) { 823 Object result = regionResults.get(sentAction.getOriginalIndex()); 824 if (result == null) { 825 if (regionException == null) { 826 LOG.error("Server sent us neither results nor exceptions for " 827 + Bytes.toStringBinary(regionName) + ", numAttempt:" + numAttempt); 828 regionException = new RuntimeException("Invalid response"); 829 } 830 // If the row operation encounters the region-lever error, the exception of action may be 831 // null. 832 result = regionException; 833 } 834 // Failure: retry if it's make sense else update the errors lists 835 if (result instanceof Throwable) { 836 Throwable actionException = (Throwable) result; 837 Row row = sentAction.getAction(); 838 lastException = regionException != null 839 ? regionException 840 : ClientExceptionsUtil.findException(actionException); 841 // Register corresponding failures once per server/once per region. 842 if (!regionFailureRegistered) { 843 regionFailureRegistered = true; 844 updateCachedLocations(server, regionName, row.getRow(), actionException); 845 } 846 if (retry == null) { 847 errorsByServer.reportServerError(server); 848 // We determine canRetry only once for all calls, after reporting server failure. 849 retry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; 850 } 851 ++failureCount; 852 switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException, server)) { 853 case YES: 854 toReplay.add(sentAction); 855 break; 856 case NO_OTHER_SUCCEEDED: 857 ++stopped; 858 break; 859 default: 860 ++failed; 861 break; 862 } 863 } else { 864 invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result); 865 setResult(sentAction, result); 866 } 867 } 868 } 869 if (toReplay.isEmpty()) { 870 logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped); 871 } else { 872 resubmit(server, toReplay, numAttempt, failureCount, lastException); 873 } 874 } 875 876 private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row, 877 Throwable rowException) { 878 if (tableName == null) { 879 return; 880 } 881 try { 882 asyncProcess.connection.updateCachedLocations(tableName, regionName, row, rowException, 883 server); 884 } catch (Throwable ex) { 885 // That should never happen, but if it did, we want to make sure 886 // we still process errors 887 LOG.error("Couldn't update cached region locations: " + ex); 888 } 889 } 890 891 private void invokeCallBack(byte[] regionName, byte[] row, CResult result) { 892 if (callback != null) { 893 try { 894 // noinspection unchecked 895 // TODO: would callback expect a replica region name if it gets one? 896 this.callback.update(regionName, row, result); 897 } catch (Throwable t) { 898 LOG.error( 899 "User callback threw an exception for " + Bytes.toStringBinary(regionName) + ", ignoring", 900 t); 901 } 902 } 903 } 904 905 private void cleanServerCache(ServerName server, Throwable regionException) { 906 if (ClientExceptionsUtil.isMetaClearingException(regionException)) { 907 // We want to make sure to clear the cache in case there were location-related exceptions. 908 // We don't to clear the cache for every possible exception that comes through, however. 909 asyncProcess.connection.clearCaches(server); 910 } 911 } 912 913 protected void updateStats(ServerName server, MultiResponse resp) { 914 ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()), 915 Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp); 916 } 917 918 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn, 919 Throwable error, long backOffTime, boolean willRetry, String startTime, int failed, 920 int stopped) { 921 StringBuilder sb = new StringBuilder(); 922 sb.append("id=").append(asyncProcess.id).append(", table=").append(tableName) 923 .append(", attempt=").append(numAttempt).append("/").append(asyncProcess.numTries) 924 .append(", "); 925 926 if (failureCount > 0 || error != null) { 927 sb.append("failureCount=").append(failureCount).append("ops").append(", last exception=") 928 .append(error); 929 } else { 930 sb.append("succeeded"); 931 } 932 933 sb.append(" on ").append(sn).append(", tracking started ").append(startTime); 934 935 if (willRetry) { 936 sb.append(", retrying after=").append(backOffTime).append("ms") 937 .append(", operationsToReplay=").append(replaySize); 938 } else if (failureCount > 0) { 939 if (stopped > 0) { 940 sb.append("; NOT retrying, stopped=").append(stopped) 941 .append(" because successful operation on other replica"); 942 } 943 if (failed > 0) { 944 sb.append("; NOT retrying, failed=").append(failed).append(" -- final attempt!"); 945 } 946 } 947 948 return sb.toString(); 949 } 950 951 /** 952 * Sets the non-error result from a particular action. 953 * @param action Action (request) that the server responded to. 954 * @param result The result. 955 */ 956 private void setResult(Action action, Object result) { 957 if (result == null) { 958 throw new RuntimeException("Result cannot be null"); 959 } 960 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); 961 int index = action.getOriginalIndex(); 962 if (results == null) { 963 decActionCounter(index); 964 return; // Simple case, no replica requests. 965 } 966 ReplicaResultState state = 967 trySetResultSimple(index, action.getAction(), false, result, null, isStale); 968 if (state == null) { 969 return; // Simple case, no replica requests. 970 } 971 // At this point we know that state is set to replica tracking class. 972 // It could be that someone else is also looking at it; however, we know there can 973 // only be one state object, and only one thread can set callCount to 0. Other threads 974 // will either see state with callCount 0 after locking it; or will not see state at all 975 // we will replace it with the result. 976 synchronized (state) { 977 if (state.callCount == 0) { 978 return; // someone already set the result 979 } 980 state.callCount = 0; 981 } 982 synchronized (replicaResultLock) { 983 if (results[index] != state) { 984 throw new AssertionError("We set the callCount but someone else replaced the result"); 985 } 986 updateResult(index, result); 987 } 988 989 decActionCounter(index); 990 } 991 992 /** 993 * Sets the error from a particular action. 994 * @param index Original action index. 995 * @param row Original request. 996 * @param throwable The resulting error. 997 * @param server The source server. 998 */ 999 private void setError(int index, Row row, Throwable throwable, ServerName server) { 1000 if (results == null) { 1001 // Note that we currently cannot have replica requests with null results. So it shouldn't 1002 // happen that multiple replica calls will call dAC for same actions with results == null. 1003 // Only one call per action should be present in this case. 1004 errors.add(throwable, row, server); 1005 decActionCounter(index); 1006 return; // Simple case, no replica requests. 1007 } 1008 ReplicaResultState state = trySetResultSimple(index, row, true, throwable, server, false); 1009 if (state == null) { 1010 return; // Simple case, no replica requests. 1011 } 1012 BatchErrors target = null; // Error will be added to final errors, or temp replica errors. 1013 boolean isActionDone = false; 1014 synchronized (state) { 1015 switch (state.callCount) { 1016 case 0: 1017 return; // someone already set the result 1018 case 1: { // All calls failed, we are the last error. 1019 target = errors; 1020 isActionDone = true; 1021 break; 1022 } 1023 default: { 1024 assert state.callCount > 1; 1025 if (state.replicaErrors == null) { 1026 state.replicaErrors = new BatchErrors(); 1027 } 1028 target = state.replicaErrors; 1029 break; 1030 } 1031 } 1032 --state.callCount; 1033 } 1034 target.add(throwable, row, server); 1035 if (isActionDone) { 1036 if (state.replicaErrors != null) { // last call, no need to lock 1037 errors.merge(state.replicaErrors); 1038 } 1039 // See setResult for explanations. 1040 synchronized (replicaResultLock) { 1041 if (results[index] != state) { 1042 throw new AssertionError("We set the callCount but someone else replaced the result"); 1043 } 1044 updateResult(index, throwable); 1045 } 1046 decActionCounter(index); 1047 } 1048 } 1049 1050 /** 1051 * Checks if the action is complete; used on error to prevent needless retries. Does not 1052 * synchronize, assuming element index/field accesses are atomic. This is an opportunistic 1053 * optimization check, doesn't have to be strict. 1054 * @param index Original action index. 1055 * @param row Original request. 1056 */ 1057 private boolean isActionComplete(int index, Row row) { 1058 if (!AsyncProcess.isReplicaGet(row)) return false; 1059 Object resObj = results[index]; 1060 return (resObj != null) 1061 && (!(resObj instanceof ReplicaResultState) || ((ReplicaResultState) resObj).callCount == 0); 1062 } 1063 1064 /** 1065 * Tries to set the result or error for a particular action as if there were no replica calls. 1066 * @return null if successful; replica state if there were in fact replica calls. 1067 */ 1068 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, Object result, 1069 ServerName server, boolean isFromReplica) { 1070 Object resObj = null; 1071 if (!AsyncProcess.isReplicaGet(row)) { 1072 if (isFromReplica) { 1073 throw new AssertionError("Unexpected stale result for " + row); 1074 } 1075 updateResult(index, result); 1076 } else { 1077 synchronized (replicaResultLock) { 1078 resObj = results[index]; 1079 if (resObj == null) { 1080 if (isFromReplica) { 1081 throw new AssertionError("Unexpected stale result for " + row); 1082 } 1083 updateResult(index, result); 1084 } 1085 } 1086 } 1087 1088 ReplicaResultState rrs = 1089 (resObj instanceof ReplicaResultState) ? (ReplicaResultState) resObj : null; 1090 if (rrs == null && isError) { 1091 // The resObj is not replica state (null or already set). 1092 errors.add((Throwable) result, row, server); 1093 } 1094 1095 if (resObj == null) { 1096 // resObj is null - no replica calls were made. 1097 decActionCounter(index); 1098 return null; 1099 } 1100 return rrs; 1101 } 1102 1103 private void decActionCounter(int index) { 1104 long actionsRemaining = actionsInProgress.decrementAndGet(); 1105 if (actionsRemaining < 0) { 1106 String error = buildDetailedErrorMsg("Incorrect actions in progress", index); 1107 throw new AssertionError(error); 1108 } else if (actionsRemaining == 0) { 1109 synchronized (actionsInProgress) { 1110 actionsInProgress.notifyAll(); 1111 } 1112 } 1113 } 1114 1115 private String buildDetailedErrorMsg(String string, int index) { 1116 StringBuilder error = new StringBuilder(128); 1117 error.append(string).append("; called for ").append(index).append(", actionsInProgress ") 1118 .append(actionsInProgress.get()).append("; replica gets: "); 1119 if (replicaGetIndices != null) { 1120 for (int i = 0; i < replicaGetIndices.length; ++i) { 1121 error.append(replicaGetIndices[i]).append(", "); 1122 } 1123 } else { 1124 error.append(hasAnyReplicaGets ? "all" : "none"); 1125 } 1126 error.append("; results "); 1127 if (results != null) { 1128 for (int i = 0; i < results.length; ++i) { 1129 Object o = results[i]; 1130 error.append(((o == null) ? "null" : o.toString())).append(", "); 1131 } 1132 } 1133 return error.toString(); 1134 } 1135 1136 @Override 1137 public void waitUntilDone() throws InterruptedIOException { 1138 try { 1139 if (this.operationTimeout > 0) { 1140 // the worker thread maybe over by some exception without decrement the actionsInProgress, 1141 // then the guarantee of operationTimeout will be broken, so we should set cutoff to avoid 1142 // stuck here forever 1143 long cutoff = (EnvironmentEdgeManager.currentTime() + this.operationTimeout) * 1000L; 1144 if (!waitUntilDone(cutoff)) { 1145 throw new SocketTimeoutException("time out before the actionsInProgress changed to zero"); 1146 } 1147 } else { 1148 waitUntilDone(Long.MAX_VALUE); 1149 } 1150 } catch (InterruptedException iex) { 1151 throw new InterruptedIOException(iex.getMessage()); 1152 } finally { 1153 if (callsInProgress != null) { 1154 for (CancellableRegionServerCallable clb : callsInProgress) { 1155 clb.cancel(); 1156 } 1157 } 1158 } 1159 } 1160 1161 private boolean waitUntilDone(long cutoff) throws InterruptedException { 1162 boolean hasWait = cutoff != Long.MAX_VALUE; 1163 long lastLog = EnvironmentEdgeManager.currentTime(); 1164 long currentInProgress; 1165 while (0 != (currentInProgress = actionsInProgress.get())) { 1166 long now = EnvironmentEdgeManager.currentTime(); 1167 if (hasWait && (now * 1000L) > cutoff) { 1168 return false; 1169 } 1170 if (!hasWait) { // Only log if wait is infinite. 1171 if (now > lastLog + 10000) { 1172 lastLog = now; 1173 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress 1174 + " actions to finish on table: " + tableName); 1175 } 1176 } 1177 synchronized (actionsInProgress) { 1178 if (actionsInProgress.get() == 0) break; 1179 if (!hasWait) { 1180 actionsInProgress.wait(10); 1181 } else { 1182 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); 1183 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); 1184 } 1185 } 1186 } 1187 return true; 1188 } 1189 1190 @Override 1191 public boolean hasError() { 1192 return errors.hasErrors(); 1193 } 1194 1195 @Override 1196 public List<? extends Row> getFailedOperations() { 1197 return errors.actions; 1198 } 1199 1200 @Override 1201 public RetriesExhaustedWithDetailsException getErrors() { 1202 return errors.makeException(asyncProcess.logBatchErrorDetails); 1203 } 1204 1205 @Override 1206 public Object[] getResults() throws InterruptedIOException { 1207 waitUntilDone(); 1208 return results; 1209 } 1210 1211 /** 1212 * Creates the server error tracker to use inside process. Currently, to preserve the main 1213 * assumption about current retries, and to work well with the retry-limit-based calculation, the 1214 * calculation is local per Process object. We may benefit from connection-wide tracking of server 1215 * errors. 1216 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 1217 */ 1218 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 1219 return new ConnectionImplementation.ServerErrorTracker(asyncProcess.serverTrackerTimeout, 1220 asyncProcess.numTries); 1221 } 1222 1223 /** 1224 * Create a callable. Isolated to be easily overridden in the tests. 1225 */ 1226 private MultiServerCallable createCallable(final ServerName server, TableName tableName, 1227 final MultiAction multi) { 1228 return new MultiServerCallable(asyncProcess.connection, tableName, server, multi, 1229 asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); 1230 } 1231 1232 private void updateResult(int index, Object result) { 1233 Object current = results[index]; 1234 if (current != null) { 1235 if (LOG.isDebugEnabled()) { 1236 LOG.debug("The result is assigned repeatedly! current:" + current + ", new:" + result); 1237 } 1238 } 1239 results[index] = result; 1240 } 1241 1242 long getNumberOfActionsInProgress() { 1243 return actionsInProgress.get(); 1244 } 1245}