001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.CellUtil.createCellScanner; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 027import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 028import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.IdentityHashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Optional; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ConcurrentSkipListMap; 043import java.util.concurrent.TimeUnit; 044import java.util.function.Supplier; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import org.apache.commons.lang3.mutable.MutableBoolean; 048import org.apache.hadoop.hbase.CellScannable; 049import org.apache.hadoop.hbase.DoNotRetryIOException; 050import org.apache.hadoop.hbase.HBaseServerException; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.RetryImmediatelyException; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 057import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 058import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 059import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 060import org.apache.hadoop.hbase.ipc.HBaseRpcController; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.io.netty.util.Timer; 068 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 073 074/** 075 * Retry caller for batch. 076 * <p> 077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 078 * other single operations 079 * <p> 080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 084 * the depth of the tree. 085 */ 086@InterfaceAudience.Private 087class AsyncBatchRpcRetryingCaller<T> { 088 089 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 090 091 private final Timer retryTimer; 092 093 private final AsyncConnectionImpl conn; 094 095 private final TableName tableName; 096 097 private final List<Action> actions; 098 099 private final List<CompletableFuture<T>> futures; 100 101 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 102 103 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 104 105 private final long pauseNs; 106 107 private final long pauseNsForServerOverloaded; 108 109 private final int maxAttempts; 110 111 private final long operationTimeoutNs; 112 113 private final long rpcTimeoutNs; 114 115 private final int startLogErrorsCnt; 116 117 private final long startNs; 118 119 // we can not use HRegionLocation as the map key because the hashCode and equals method of 120 // HRegionLocation only consider serverName. 121 private static final class RegionRequest { 122 123 public final HRegionLocation loc; 124 125 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 126 127 public RegionRequest(HRegionLocation loc) { 128 this.loc = loc; 129 } 130 } 131 132 private static final class ServerRequest { 133 134 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 135 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 136 137 public void addAction(HRegionLocation loc, Action action) { 138 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 139 () -> new RegionRequest(loc)).actions.add(action); 140 } 141 142 public void setRegionRequest(byte[] regionName, RegionRequest regionReq) { 143 actionsByRegion.put(regionName, regionReq); 144 } 145 146 public int getPriority() { 147 return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) 148 .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); 149 } 150 } 151 152 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 153 TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded, 154 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 155 this.retryTimer = retryTimer; 156 this.conn = conn; 157 this.tableName = tableName; 158 this.pauseNs = pauseNs; 159 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 160 this.maxAttempts = maxAttempts; 161 this.operationTimeoutNs = operationTimeoutNs; 162 this.rpcTimeoutNs = rpcTimeoutNs; 163 this.startLogErrorsCnt = startLogErrorsCnt; 164 this.actions = new ArrayList<>(actions.size()); 165 this.futures = new ArrayList<>(actions.size()); 166 this.action2Future = new IdentityHashMap<>(actions.size()); 167 for (int i = 0, n = actions.size(); i < n; i++) { 168 Row rawAction = actions.get(i); 169 Action action; 170 if (rawAction instanceof OperationWithAttributes) { 171 action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority()); 172 } else { 173 action = new Action(rawAction, i); 174 } 175 if (hasIncrementOrAppend(rawAction)) { 176 action.setNonce(conn.getNonceGenerator().newNonce()); 177 } 178 this.actions.add(action); 179 CompletableFuture<T> future = new CompletableFuture<>(); 180 futures.add(future); 181 action2Future.put(action, future); 182 } 183 this.action2Errors = new IdentityHashMap<>(); 184 this.startNs = System.nanoTime(); 185 } 186 187 private static boolean hasIncrementOrAppend(Row action) { 188 if (action instanceof Append || action instanceof Increment) { 189 return true; 190 } else if (action instanceof RowMutations) { 191 return hasIncrementOrAppend((RowMutations) action); 192 } else if (action instanceof CheckAndMutate) { 193 return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); 194 } 195 return false; 196 } 197 198 private static boolean hasIncrementOrAppend(RowMutations mutations) { 199 for (Mutation mutation : mutations.getMutations()) { 200 if (mutation instanceof Append || mutation instanceof Increment) { 201 return true; 202 } 203 } 204 return false; 205 } 206 207 private long remainingTimeNs() { 208 return operationTimeoutNs - (System.nanoTime() - startNs); 209 } 210 211 private List<ThrowableWithExtraContext> removeErrors(Action action) { 212 synchronized (action2Errors) { 213 return action2Errors.remove(action); 214 } 215 } 216 217 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 218 Throwable error, ServerName serverName) { 219 if (tries > startLogErrorsCnt) { 220 String regions = 221 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 222 .collect(Collectors.joining(",", "[", "]")); 223 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName 224 + " failed, tries=" + tries, error); 225 } 226 } 227 228 private String getExtraContextForError(ServerName serverName) { 229 return serverName != null ? serverName.getServerName() : ""; 230 } 231 232 private void addError(Action action, Throwable error, ServerName serverName) { 233 List<ThrowableWithExtraContext> errors; 234 synchronized (action2Errors) { 235 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 236 } 237 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 238 getExtraContextForError(serverName))); 239 } 240 241 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 242 actions.forEach(action -> addError(action, error, serverName)); 243 } 244 245 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 246 CompletableFuture<T> future = action2Future.get(action); 247 if (future.isDone()) { 248 return; 249 } 250 ThrowableWithExtraContext errorWithCtx = 251 new ThrowableWithExtraContext(error, currentTime, extras); 252 List<ThrowableWithExtraContext> errors = removeErrors(action); 253 if (errors == null) { 254 errors = Collections.singletonList(errorWithCtx); 255 } else { 256 errors.add(errorWithCtx); 257 } 258 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 259 } 260 261 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 262 long currentTime = EnvironmentEdgeManager.currentTime(); 263 String extras = getExtraContextForError(serverName); 264 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 265 } 266 267 private void failAll(Stream<Action> actions, int tries) { 268 actions.forEach(action -> { 269 CompletableFuture<T> future = action2Future.get(action); 270 if (future.isDone()) { 271 return; 272 } 273 future.completeExceptionally(new RetriesExhaustedException(tries, 274 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 275 }); 276 } 277 278 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 279 List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException { 280 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 281 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 282 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 283 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 284 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 285 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 286 // multiRequestBuilder will be populated with region actions. 287 // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the 288 // action list. 289 RequestConverter.buildNoDataRegionActions(entry.getKey(), 290 entry.getValue().actions.stream() 291 .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) 292 .collect(Collectors.toList()), 293 cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 294 indexMap); 295 } 296 return multiRequestBuilder.build(); 297 } 298 299 @SuppressWarnings("unchecked") 300 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 301 RegionResult regionResult, List<Action> failedActions, Throwable regionException, 302 MutableBoolean retryImmediately) { 303 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 304 if (result == null) { 305 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" 306 + Bytes.toStringBinary(action.getAction().getRow()) + "' of " 307 + regionReq.loc.getRegion().getRegionNameAsString()); 308 addError(action, new RuntimeException("Invalid response"), serverName); 309 failedActions.add(action); 310 } else if (result instanceof Throwable) { 311 Throwable error = translateException((Throwable) result); 312 logException(tries, () -> Stream.of(regionReq), error, serverName); 313 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 314 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 315 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 316 getExtraContextForError(serverName)); 317 } else { 318 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 319 retryImmediately.setTrue(); 320 } 321 failedActions.add(action); 322 } 323 } else { 324 action2Future.get(action).complete((T) result); 325 } 326 } 327 328 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 329 ServerName serverName, MultiResponse resp) { 330 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 331 serverName, resp); 332 List<Action> failedActions = new ArrayList<>(); 333 MutableBoolean retryImmediately = new MutableBoolean(false); 334 actionsByRegion.forEach((rn, regionReq) -> { 335 RegionResult regionResult = resp.getResults().get(rn); 336 Throwable regionException = resp.getException(rn); 337 if (regionResult != null) { 338 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 339 regionResult, failedActions, regionException, retryImmediately)); 340 } else { 341 Throwable error; 342 if (regionException == null) { 343 LOG.error("Server sent us neither results nor exceptions for {}", 344 Bytes.toStringBinary(rn)); 345 error = new RuntimeException("Invalid response"); 346 } else { 347 error = translateException(regionException); 348 } 349 logException(tries, () -> Stream.of(regionReq), error, serverName); 350 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 351 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 352 failAll(regionReq.actions.stream(), tries, error, serverName); 353 return; 354 } 355 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 356 retryImmediately.setTrue(); 357 } 358 addError(regionReq.actions, error, serverName); 359 failedActions.addAll(regionReq.actions); 360 } 361 }); 362 if (!failedActions.isEmpty()) { 363 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false); 364 } 365 } 366 367 private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { 368 long remainingNs; 369 if (operationTimeoutNs > 0) { 370 remainingNs = remainingTimeNs(); 371 if (remainingNs <= 0) { 372 failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), 373 tries); 374 return; 375 } 376 } else { 377 remainingNs = Long.MAX_VALUE; 378 } 379 ClientService.Interface stub; 380 try { 381 stub = conn.getRegionServerStub(serverName); 382 } catch (IOException e) { 383 onError(serverReq.actionsByRegion, tries, e, serverName); 384 return; 385 } 386 ClientProtos.MultiRequest req; 387 List<CellScannable> cells = new ArrayList<>(); 388 // Map from a created RegionAction to the original index for a RowMutations within 389 // the original list of actions. This will be used to process the results when there 390 // is RowMutations/CheckAndMutate in the action list. 391 Map<Integer, Integer> indexMap = new HashMap<>(); 392 try { 393 req = buildReq(serverReq.actionsByRegion, cells, indexMap); 394 } catch (IOException e) { 395 onError(serverReq.actionsByRegion, tries, e, serverName); 396 return; 397 } 398 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 399 resetController(controller, Math.min(rpcTimeoutNs, remainingNs), 400 calcPriority(serverReq.getPriority(), tableName)); 401 if (!cells.isEmpty()) { 402 controller.setCellScanner(createCellScanner(cells)); 403 } 404 stub.multi(controller, req, resp -> { 405 if (controller.failed()) { 406 onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName); 407 } else { 408 try { 409 onComplete(serverReq.actionsByRegion, tries, serverName, 410 ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner())); 411 } catch (Exception e) { 412 onError(serverReq.actionsByRegion, tries, e, serverName); 413 return; 414 } 415 } 416 }); 417 } 418 419 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit, 420 // based on the load of the region server and the region. 421 private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) { 422 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 423 Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker(); 424 if (!optStats.isPresent()) { 425 actionsByServer.forEach((serverName, serverReq) -> { 426 metrics.ifPresent(MetricsConnection::incrNormalRunners); 427 sendToServer(serverName, serverReq, tries); 428 }); 429 return; 430 } 431 ServerStatisticTracker stats = optStats.get(); 432 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); 433 actionsByServer.forEach((serverName, serverReq) -> { 434 ServerStatistics serverStats = stats.getStats(serverName); 435 Map<Long, ServerRequest> groupByBackoff = new HashMap<>(); 436 serverReq.actionsByRegion.forEach((regionName, regionReq) -> { 437 long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats); 438 groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest()) 439 .setRegionRequest(regionName, regionReq); 440 }); 441 groupByBackoff.forEach((backoff, sr) -> { 442 if (backoff > 0) { 443 metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff)); 444 retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff, 445 TimeUnit.MILLISECONDS); 446 } else { 447 metrics.ifPresent(MetricsConnection::incrNormalRunners); 448 sendToServer(serverName, sr, tries); 449 } 450 }); 451 }); 452 } 453 454 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 455 ServerName serverName) { 456 Throwable error = translateException(t); 457 logException(tries, () -> actionsByRegion.values().stream(), error, serverName); 458 actionsByRegion.forEach( 459 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 460 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 461 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 462 serverName); 463 return; 464 } 465 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 466 .collect(Collectors.toList()); 467 addError(copiedActions, error, serverName); 468 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, 469 HBaseServerException.isServerOverloaded(error)); 470 } 471 472 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately, 473 boolean isServerOverloaded) { 474 if (immediately) { 475 groupAndSend(actions, tries); 476 return; 477 } 478 long delayNs; 479 long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs; 480 if (operationTimeoutNs > 0) { 481 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 482 if (maxDelayNs <= 0) { 483 failAll(actions, tries); 484 return; 485 } 486 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 487 } else { 488 delayNs = getPauseTime(pauseNsToUse, tries - 1); 489 } 490 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 491 } 492 493 private void groupAndSend(Stream<Action> actions, int tries) { 494 long locateTimeoutNs; 495 if (operationTimeoutNs > 0) { 496 locateTimeoutNs = remainingTimeNs(); 497 if (locateTimeoutNs <= 0) { 498 failAll(actions, tries); 499 return; 500 } 501 } else { 502 locateTimeoutNs = -1L; 503 } 504 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 505 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 506 addListener(CompletableFuture.allOf(actions 507 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 508 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 509 if (error != null) { 510 error = unwrapCompletionException(translateException(error)); 511 if (error instanceof DoNotRetryIOException) { 512 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 513 return; 514 } 515 addError(action, error, null); 516 locateFailed.add(action); 517 } else { 518 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 519 action); 520 } 521 })) 522 .toArray(CompletableFuture[]::new)), (v, r) -> { 523 if (!actionsByServer.isEmpty()) { 524 sendOrDelay(actionsByServer, tries); 525 } 526 if (!locateFailed.isEmpty()) { 527 tryResubmit(locateFailed.stream(), tries, false, false); 528 } 529 }); 530 } 531 532 public List<CompletableFuture<T>> call() { 533 groupAndSend(actions.stream(), 1); 534 return futures; 535 } 536}