001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 023import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 024import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 025import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.IdentityHashMap; 032import java.util.List; 033import java.util.Map; 034import java.util.Optional; 035import java.util.OptionalLong; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.ConcurrentSkipListMap; 041import java.util.concurrent.TimeUnit; 042import java.util.function.Supplier; 043import java.util.stream.Collectors; 044import java.util.stream.Stream; 045import org.apache.commons.lang3.mutable.MutableBoolean; 046import org.apache.hadoop.hbase.DoNotRetryIOException; 047import org.apache.hadoop.hbase.ExtendedCellScannable; 048import org.apache.hadoop.hbase.HBaseServerException; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.HRegionLocation; 051import org.apache.hadoop.hbase.PrivateCellUtil; 052import org.apache.hadoop.hbase.RetryImmediatelyException; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 056import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 057import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 058import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; 059import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 060import org.apache.hadoop.hbase.ipc.HBaseRpcController; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.io.netty.util.Timer; 068 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 073 074/** 075 * Retry caller for batch. 076 * <p> 077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 078 * other single operations 079 * <p> 080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 084 * the depth of the tree. 085 */ 086@InterfaceAudience.Private 087class AsyncBatchRpcRetryingCaller<T> { 088 089 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 090 091 private final Timer retryTimer; 092 093 private final AsyncConnectionImpl conn; 094 095 private final TableName tableName; 096 097 private final List<Action> actions; 098 099 private final List<CompletableFuture<T>> futures; 100 101 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 102 103 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 104 105 private final int maxAttempts; 106 107 private final long operationTimeoutNs; 108 109 private final long rpcTimeoutNs; 110 111 private final int startLogErrorsCnt; 112 113 private final long startNs; 114 115 private final HBaseServerExceptionPauseManager pauseManager; 116 117 private final Map<String, byte[]> requestAttributes; 118 119 // we can not use HRegionLocation as the map key because the hashCode and equals method of 120 // HRegionLocation only consider serverName. 121 private static final class RegionRequest { 122 123 public final HRegionLocation loc; 124 125 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 126 127 public RegionRequest(HRegionLocation loc) { 128 this.loc = loc; 129 } 130 } 131 132 private static final class ServerRequest { 133 134 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 135 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 136 137 public void addAction(HRegionLocation loc, Action action) { 138 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 139 () -> new RegionRequest(loc)).actions.add(action); 140 } 141 142 public void setRegionRequest(byte[] regionName, RegionRequest regionReq) { 143 actionsByRegion.put(regionName, regionReq); 144 } 145 146 public int getPriority() { 147 return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) 148 .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); 149 } 150 } 151 152 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 153 TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded, 154 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, 155 Map<String, byte[]> requestAttributes) { 156 this.retryTimer = retryTimer; 157 this.conn = conn; 158 this.tableName = tableName; 159 this.maxAttempts = maxAttempts; 160 this.operationTimeoutNs = operationTimeoutNs; 161 this.rpcTimeoutNs = rpcTimeoutNs; 162 this.startLogErrorsCnt = startLogErrorsCnt; 163 this.actions = new ArrayList<>(actions.size()); 164 this.futures = new ArrayList<>(actions.size()); 165 this.action2Future = new IdentityHashMap<>(actions.size()); 166 for (int i = 0, n = actions.size(); i < n; i++) { 167 Row rawAction = actions.get(i); 168 Action action; 169 if (rawAction instanceof OperationWithAttributes) { 170 action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority()); 171 } else { 172 action = new Action(rawAction, i); 173 } 174 if (hasIncrementOrAppend(rawAction)) { 175 action.setNonce(conn.getNonceGenerator().newNonce()); 176 } 177 this.actions.add(action); 178 CompletableFuture<T> future = new CompletableFuture<>(); 179 futures.add(future); 180 action2Future.put(action, future); 181 } 182 this.action2Errors = new IdentityHashMap<>(); 183 this.startNs = System.nanoTime(); 184 this.pauseManager = 185 new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); 186 this.requestAttributes = requestAttributes; 187 } 188 189 private static boolean hasIncrementOrAppend(Row action) { 190 if (action instanceof Append || action instanceof Increment) { 191 return true; 192 } else if (action instanceof RowMutations) { 193 return hasIncrementOrAppend((RowMutations) action); 194 } else if (action instanceof CheckAndMutate) { 195 return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); 196 } 197 return false; 198 } 199 200 private static boolean hasIncrementOrAppend(RowMutations mutations) { 201 for (Mutation mutation : mutations.getMutations()) { 202 if (mutation instanceof Append || mutation instanceof Increment) { 203 return true; 204 } 205 } 206 return false; 207 } 208 209 private List<ThrowableWithExtraContext> removeErrors(Action action) { 210 synchronized (action2Errors) { 211 return action2Errors.remove(action); 212 } 213 } 214 215 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 216 Throwable error, ServerName serverName) { 217 if (tries > startLogErrorsCnt) { 218 String regions = 219 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 220 .collect(Collectors.joining(",", "[", "]")); 221 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName 222 + " failed, tries=" + tries, error); 223 } 224 } 225 226 private String getExtraContextForError(ServerName serverName) { 227 return serverName != null ? serverName.getServerName() : ""; 228 } 229 230 private void addError(Action action, Throwable error, ServerName serverName) { 231 List<ThrowableWithExtraContext> errors; 232 synchronized (action2Errors) { 233 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 234 } 235 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 236 getExtraContextForError(serverName))); 237 } 238 239 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 240 actions.forEach(action -> addError(action, error, serverName)); 241 } 242 243 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 244 CompletableFuture<T> future = action2Future.get(action); 245 if (future.isDone()) { 246 return; 247 } 248 ThrowableWithExtraContext errorWithCtx = 249 new ThrowableWithExtraContext(error, currentTime, extras); 250 List<ThrowableWithExtraContext> errors = removeErrors(action); 251 if (errors == null) { 252 errors = Collections.singletonList(errorWithCtx); 253 } else { 254 errors.add(errorWithCtx); 255 } 256 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 257 } 258 259 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 260 long currentTime = EnvironmentEdgeManager.currentTime(); 261 String extras = getExtraContextForError(serverName); 262 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 263 } 264 265 private void failAll(Stream<Action> actions, int tries) { 266 actions.forEach(action -> { 267 CompletableFuture<T> future = action2Future.get(action); 268 if (future.isDone()) { 269 return; 270 } 271 future.completeExceptionally(new RetriesExhaustedException(tries, 272 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 273 }); 274 } 275 276 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 277 List<ExtendedCellScannable> cells, Map<Integer, Integer> indexMap) throws IOException { 278 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 279 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 280 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 281 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 282 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 283 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 284 // multiRequestBuilder will be populated with region actions. 285 // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the 286 // action list. 287 RequestConverter.buildNoDataRegionActions(entry.getKey(), 288 entry.getValue().actions.stream() 289 .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) 290 .collect(Collectors.toList()), 291 cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 292 indexMap); 293 } 294 return multiRequestBuilder.build(); 295 } 296 297 @SuppressWarnings("unchecked") 298 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 299 RegionResult regionResult, List<Action> failedActions, Throwable regionException, 300 MutableBoolean retryImmediately) { 301 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 302 if (result == null) { 303 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" 304 + Bytes.toStringBinary(action.getAction().getRow()) + "' of " 305 + regionReq.loc.getRegion().getRegionNameAsString()); 306 addError(action, new RuntimeException("Invalid response"), serverName); 307 failedActions.add(action); 308 } else if (result instanceof Throwable) { 309 Throwable error = translateException((Throwable) result); 310 logException(tries, () -> Stream.of(regionReq), error, serverName); 311 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 312 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 313 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 314 getExtraContextForError(serverName)); 315 } else { 316 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 317 retryImmediately.setTrue(); 318 } 319 failedActions.add(action); 320 } 321 } else { 322 action2Future.get(action).complete((T) result); 323 } 324 } 325 326 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 327 ServerName serverName, MultiResponse resp) { 328 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 329 serverName, resp); 330 List<Action> failedActions = new ArrayList<>(); 331 MutableBoolean retryImmediately = new MutableBoolean(false); 332 actionsByRegion.forEach((rn, regionReq) -> { 333 RegionResult regionResult = resp.getResults().get(rn); 334 Throwable regionException = resp.getException(rn); 335 if (regionResult != null) { 336 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 337 regionResult, failedActions, regionException, retryImmediately)); 338 } else { 339 Throwable error; 340 if (regionException == null) { 341 LOG.error("Server sent us neither results nor exceptions for {}", 342 Bytes.toStringBinary(rn)); 343 error = new RuntimeException("Invalid response"); 344 } else { 345 error = translateException(regionException); 346 } 347 logException(tries, () -> Stream.of(regionReq), error, serverName); 348 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 349 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 350 failAll(regionReq.actions.stream(), tries, error, serverName); 351 return; 352 } 353 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 354 retryImmediately.setTrue(); 355 } 356 addError(regionReq.actions, error, serverName); 357 failedActions.addAll(regionReq.actions); 358 } 359 }); 360 if (!failedActions.isEmpty()) { 361 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null); 362 } 363 } 364 365 private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { 366 long remainingNs; 367 if (operationTimeoutNs > 0) { 368 remainingNs = pauseManager.remainingTimeNs(startNs); 369 if (remainingNs <= 0) { 370 failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), 371 tries); 372 return; 373 } 374 } else { 375 remainingNs = Long.MAX_VALUE; 376 } 377 ClientService.Interface stub; 378 try { 379 stub = conn.getRegionServerStub(serverName); 380 } catch (IOException e) { 381 onError(serverReq.actionsByRegion, tries, e, serverName); 382 return; 383 } 384 ClientProtos.MultiRequest req; 385 List<ExtendedCellScannable> cells = new ArrayList<>(); 386 // Map from a created RegionAction to the original index for a RowMutations within 387 // the original list of actions. This will be used to process the results when there 388 // is RowMutations/CheckAndMutate in the action list. 389 Map<Integer, Integer> indexMap = new HashMap<>(); 390 try { 391 req = buildReq(serverReq.actionsByRegion, cells, indexMap); 392 } catch (IOException e) { 393 onError(serverReq.actionsByRegion, tries, e, serverName); 394 return; 395 } 396 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 397 resetController(controller, Math.min(rpcTimeoutNs, remainingNs), 398 calcPriority(serverReq.getPriority(), tableName), tableName); 399 controller.setRequestAttributes(requestAttributes); 400 if (!cells.isEmpty()) { 401 controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cells)); 402 } 403 stub.multi(controller, req, resp -> { 404 if (controller.failed()) { 405 onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName); 406 } else { 407 try { 408 onComplete(serverReq.actionsByRegion, tries, serverName, 409 ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner())); 410 } catch (Exception e) { 411 onError(serverReq.actionsByRegion, tries, e, serverName); 412 return; 413 } 414 } 415 }); 416 } 417 418 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit, 419 // based on the load of the region server and the region. 420 private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) { 421 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 422 Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker(); 423 if (!optStats.isPresent()) { 424 actionsByServer.forEach((serverName, serverReq) -> { 425 metrics.ifPresent(MetricsConnection::incrNormalRunners); 426 sendToServer(serverName, serverReq, tries); 427 }); 428 return; 429 } 430 ServerStatisticTracker stats = optStats.get(); 431 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); 432 actionsByServer.forEach((serverName, serverReq) -> { 433 ServerStatistics serverStats = stats.getStats(serverName); 434 Map<Long, ServerRequest> groupByBackoff = new HashMap<>(); 435 serverReq.actionsByRegion.forEach((regionName, regionReq) -> { 436 long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats); 437 groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest()) 438 .setRegionRequest(regionName, regionReq); 439 }); 440 groupByBackoff.forEach((backoff, sr) -> { 441 if (backoff > 0) { 442 metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff)); 443 retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff, 444 TimeUnit.MILLISECONDS); 445 } else { 446 metrics.ifPresent(MetricsConnection::incrNormalRunners); 447 sendToServer(serverName, sr, tries); 448 } 449 }); 450 }); 451 } 452 453 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 454 ServerName serverName) { 455 Throwable error = translateException(t); 456 logException(tries, () -> actionsByRegion.values().stream(), error, serverName); 457 actionsByRegion.forEach( 458 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 459 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 460 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 461 serverName); 462 return; 463 } 464 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 465 .collect(Collectors.toList()); 466 addError(copiedActions, error, serverName); 467 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error); 468 } 469 470 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately, 471 Throwable error) { 472 if (immediately) { 473 groupAndSend(actions, tries); 474 return; 475 } 476 477 OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs); 478 if (!maybePauseNsToUse.isPresent()) { 479 failAll(actions, tries); 480 return; 481 } 482 long delayNs = maybePauseNsToUse.getAsLong(); 483 if (HBaseServerException.isServerOverloaded(error)) { 484 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 485 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 486 } 487 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 488 } 489 490 private void groupAndSend(Stream<Action> actions, int tries) { 491 long locateTimeoutNs; 492 if (operationTimeoutNs > 0) { 493 locateTimeoutNs = pauseManager.remainingTimeNs(startNs); 494 if (locateTimeoutNs <= 0) { 495 failAll(actions, tries); 496 return; 497 } 498 } else { 499 locateTimeoutNs = -1L; 500 } 501 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 502 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 503 addListener(CompletableFuture.allOf(actions 504 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 505 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 506 if (error != null) { 507 error = unwrapCompletionException(translateException(error)); 508 if (error instanceof DoNotRetryIOException) { 509 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 510 return; 511 } 512 addError(action, error, null); 513 locateFailed.add(action); 514 } else { 515 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 516 action); 517 } 518 })) 519 .toArray(CompletableFuture[]::new)), (v, r) -> { 520 if (!actionsByServer.isEmpty()) { 521 sendOrDelay(actionsByServer, tries); 522 } 523 if (!locateFailed.isEmpty()) { 524 tryResubmit(locateFailed.stream(), tries, false, null); 525 } 526 }); 527 } 528 529 public List<CompletableFuture<T>> call() { 530 groupAndSend(actions.stream(), 1); 531 return futures; 532 } 533}