001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.CellUtil.createCellScanner; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 027import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 028import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.IdentityHashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Optional; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.ConcurrentSkipListMap; 043import java.util.concurrent.TimeUnit; 044import java.util.function.Supplier; 045import java.util.stream.Collectors; 046import java.util.stream.Stream; 047import org.apache.commons.lang3.mutable.MutableBoolean; 048import org.apache.hadoop.hbase.CallQueueTooBigException; 049import org.apache.hadoop.hbase.CellScannable; 050import org.apache.hadoop.hbase.DoNotRetryIOException; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.RetryImmediatelyException; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 057import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 058import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 059import org.apache.hadoop.hbase.client.backoff.ServerStatistics; 060import org.apache.hadoop.hbase.ipc.HBaseRpcController; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.io.netty.util.Timer; 068 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 073 074/** 075 * Retry caller for batch. 076 * <p> 077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 078 * other single operations 079 * <p> 080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 084 * the depth of the tree. 085 */ 086@InterfaceAudience.Private 087class AsyncBatchRpcRetryingCaller<T> { 088 089 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 090 091 private final Timer retryTimer; 092 093 private final AsyncConnectionImpl conn; 094 095 private final TableName tableName; 096 097 private final List<Action> actions; 098 099 private final List<CompletableFuture<T>> futures; 100 101 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 102 103 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 104 105 private final long pauseNs; 106 107 private final long pauseForCQTBENs; 108 109 private final int maxAttempts; 110 111 private final long operationTimeoutNs; 112 113 private final long rpcTimeoutNs; 114 115 private final int startLogErrorsCnt; 116 117 private final long startNs; 118 119 // we can not use HRegionLocation as the map key because the hashCode and equals method of 120 // HRegionLocation only consider serverName. 121 private static final class RegionRequest { 122 123 public final HRegionLocation loc; 124 125 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 126 127 public RegionRequest(HRegionLocation loc) { 128 this.loc = loc; 129 } 130 } 131 132 private static final class ServerRequest { 133 134 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 135 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 136 137 public void addAction(HRegionLocation loc, Action action) { 138 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 139 () -> new RegionRequest(loc)).actions.add(action); 140 } 141 142 public void setRegionRequest(byte[] regionName, RegionRequest regionReq) { 143 actionsByRegion.put(regionName, regionReq); 144 } 145 146 public int getPriority() { 147 return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) 148 .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); 149 } 150 } 151 152 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 153 TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs, 154 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 155 this.retryTimer = retryTimer; 156 this.conn = conn; 157 this.tableName = tableName; 158 this.pauseNs = pauseNs; 159 this.pauseForCQTBENs = pauseForCQTBENs; 160 this.maxAttempts = maxAttempts; 161 this.operationTimeoutNs = operationTimeoutNs; 162 this.rpcTimeoutNs = rpcTimeoutNs; 163 this.startLogErrorsCnt = startLogErrorsCnt; 164 this.actions = new ArrayList<>(actions.size()); 165 this.futures = new ArrayList<>(actions.size()); 166 this.action2Future = new IdentityHashMap<>(actions.size()); 167 for (int i = 0, n = actions.size(); i < n; i++) { 168 Row rawAction = actions.get(i); 169 Action action; 170 if (rawAction instanceof OperationWithAttributes) { 171 action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority()); 172 } else { 173 action = new Action(rawAction, i); 174 } 175 if (rawAction instanceof Append || rawAction instanceof Increment) { 176 action.setNonce(conn.getNonceGenerator().newNonce()); 177 } 178 this.actions.add(action); 179 CompletableFuture<T> future = new CompletableFuture<>(); 180 futures.add(future); 181 action2Future.put(action, future); 182 } 183 this.action2Errors = new IdentityHashMap<>(); 184 this.startNs = System.nanoTime(); 185 } 186 187 private long remainingTimeNs() { 188 return operationTimeoutNs - (System.nanoTime() - startNs); 189 } 190 191 private List<ThrowableWithExtraContext> removeErrors(Action action) { 192 synchronized (action2Errors) { 193 return action2Errors.remove(action); 194 } 195 } 196 197 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 198 Throwable error, ServerName serverName) { 199 if (tries > startLogErrorsCnt) { 200 String regions = 201 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 202 .collect(Collectors.joining(",", "[", "]")); 203 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + 204 " failed, tries=" + tries, error); 205 } 206 } 207 208 private String getExtraContextForError(ServerName serverName) { 209 return serverName != null ? serverName.getServerName() : ""; 210 } 211 212 private void addError(Action action, Throwable error, ServerName serverName) { 213 List<ThrowableWithExtraContext> errors; 214 synchronized (action2Errors) { 215 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 216 } 217 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 218 getExtraContextForError(serverName))); 219 } 220 221 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 222 actions.forEach(action -> addError(action, error, serverName)); 223 } 224 225 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 226 CompletableFuture<T> future = action2Future.get(action); 227 if (future.isDone()) { 228 return; 229 } 230 ThrowableWithExtraContext errorWithCtx = 231 new ThrowableWithExtraContext(error, currentTime, extras); 232 List<ThrowableWithExtraContext> errors = removeErrors(action); 233 if (errors == null) { 234 errors = Collections.singletonList(errorWithCtx); 235 } else { 236 errors.add(errorWithCtx); 237 } 238 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 239 } 240 241 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 242 long currentTime = EnvironmentEdgeManager.currentTime(); 243 String extras = getExtraContextForError(serverName); 244 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 245 } 246 247 private void failAll(Stream<Action> actions, int tries) { 248 actions.forEach(action -> { 249 CompletableFuture<T> future = action2Future.get(action); 250 if (future.isDone()) { 251 return; 252 } 253 future.completeExceptionally(new RetriesExhaustedException(tries, 254 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 255 }); 256 } 257 258 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 259 List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException { 260 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 261 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 262 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 263 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 264 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 265 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 266 // multiRequestBuilder will be populated with region actions. 267 // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the 268 // action list. 269 RequestConverter.buildNoDataRegionActions(entry.getKey(), 270 entry.getValue().actions.stream() 271 .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex())) 272 .collect(Collectors.toList()), 273 cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 274 rowMutationsIndexMap); 275 } 276 return multiRequestBuilder.build(); 277 } 278 279 @SuppressWarnings("unchecked") 280 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 281 RegionResult regionResult, List<Action> failedActions, Throwable regionException, 282 MutableBoolean retryImmediately) { 283 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 284 if (result == null) { 285 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + 286 Bytes.toStringBinary(action.getAction().getRow()) + "' of " + 287 regionReq.loc.getRegion().getRegionNameAsString()); 288 addError(action, new RuntimeException("Invalid response"), serverName); 289 failedActions.add(action); 290 } else if (result instanceof Throwable) { 291 Throwable error = translateException((Throwable) result); 292 logException(tries, () -> Stream.of(regionReq), error, serverName); 293 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 294 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 295 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 296 getExtraContextForError(serverName)); 297 } else { 298 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 299 retryImmediately.setTrue(); 300 } 301 failedActions.add(action); 302 } 303 } else { 304 action2Future.get(action).complete((T) result); 305 } 306 } 307 308 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 309 ServerName serverName, MultiResponse resp) { 310 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 311 serverName, resp); 312 List<Action> failedActions = new ArrayList<>(); 313 MutableBoolean retryImmediately = new MutableBoolean(false); 314 actionsByRegion.forEach((rn, regionReq) -> { 315 RegionResult regionResult = resp.getResults().get(rn); 316 Throwable regionException = resp.getException(rn); 317 if (regionResult != null) { 318 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 319 regionResult, failedActions, regionException, retryImmediately)); 320 } else { 321 Throwable error; 322 if (regionException == null) { 323 LOG.error("Server sent us neither results nor exceptions for {}", 324 Bytes.toStringBinary(rn)); 325 error = new RuntimeException("Invalid response"); 326 } else { 327 error = translateException(regionException); 328 } 329 logException(tries, () -> Stream.of(regionReq), error, serverName); 330 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 331 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 332 failAll(regionReq.actions.stream(), tries, error, serverName); 333 return; 334 } 335 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 336 retryImmediately.setTrue(); 337 } 338 addError(regionReq.actions, error, serverName); 339 failedActions.addAll(regionReq.actions); 340 } 341 }); 342 if (!failedActions.isEmpty()) { 343 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false); 344 } 345 } 346 347 private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { 348 long remainingNs; 349 if (operationTimeoutNs > 0) { 350 remainingNs = remainingTimeNs(); 351 if (remainingNs <= 0) { 352 failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), 353 tries); 354 return; 355 } 356 } else { 357 remainingNs = Long.MAX_VALUE; 358 } 359 ClientService.Interface stub; 360 try { 361 stub = conn.getRegionServerStub(serverName); 362 } catch (IOException e) { 363 onError(serverReq.actionsByRegion, tries, e, serverName); 364 return; 365 } 366 ClientProtos.MultiRequest req; 367 List<CellScannable> cells = new ArrayList<>(); 368 // Map from a created RegionAction to the original index for a RowMutations within 369 // the original list of actions. This will be used to process the results when there 370 // is RowMutations in the action list. 371 Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>(); 372 try { 373 req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); 374 } catch (IOException e) { 375 onError(serverReq.actionsByRegion, tries, e, serverName); 376 return; 377 } 378 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 379 resetController(controller, Math.min(rpcTimeoutNs, remainingNs), 380 calcPriority(serverReq.getPriority(), tableName)); 381 if (!cells.isEmpty()) { 382 controller.setCellScanner(createCellScanner(cells)); 383 } 384 stub.multi(controller, req, resp -> { 385 if (controller.failed()) { 386 onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName); 387 } else { 388 try { 389 onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req, 390 rowMutationsIndexMap, resp, controller.cellScanner())); 391 } catch (Exception e) { 392 onError(serverReq.actionsByRegion, tries, e, serverName); 393 return; 394 } 395 } 396 }); 397 } 398 399 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit, 400 // based on the load of the region server and the region. 401 private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) { 402 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 403 Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker(); 404 if (!optStats.isPresent()) { 405 actionsByServer.forEach((serverName, serverReq) -> { 406 metrics.ifPresent(MetricsConnection::incrNormalRunners); 407 sendToServer(serverName, serverReq, tries); 408 }); 409 return; 410 } 411 ServerStatisticTracker stats = optStats.get(); 412 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy(); 413 actionsByServer.forEach((serverName, serverReq) -> { 414 ServerStatistics serverStats = stats.getStats(serverName); 415 Map<Long, ServerRequest> groupByBackoff = new HashMap<>(); 416 serverReq.actionsByRegion.forEach((regionName, regionReq) -> { 417 long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats); 418 groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest()) 419 .setRegionRequest(regionName, regionReq); 420 }); 421 groupByBackoff.forEach((backoff, sr) -> { 422 if (backoff > 0) { 423 metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff)); 424 retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff, 425 TimeUnit.MILLISECONDS); 426 } else { 427 metrics.ifPresent(MetricsConnection::incrNormalRunners); 428 sendToServer(serverName, sr, tries); 429 } 430 }); 431 }); 432 } 433 434 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 435 ServerName serverName) { 436 Throwable error = translateException(t); 437 logException(tries, () -> actionsByRegion.values().stream(), error, serverName); 438 actionsByRegion.forEach( 439 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 440 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 441 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 442 serverName); 443 return; 444 } 445 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 446 .collect(Collectors.toList()); 447 addError(copiedActions, error, serverName); 448 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, 449 error instanceof CallQueueTooBigException); 450 } 451 452 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately, 453 boolean isCallQueueTooBig) { 454 if (immediately) { 455 groupAndSend(actions, tries); 456 return; 457 } 458 long delayNs; 459 long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs; 460 if (operationTimeoutNs > 0) { 461 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 462 if (maxDelayNs <= 0) { 463 failAll(actions, tries); 464 return; 465 } 466 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 467 } else { 468 delayNs = getPauseTime(pauseNsToUse, tries - 1); 469 } 470 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 471 } 472 473 private void groupAndSend(Stream<Action> actions, int tries) { 474 long locateTimeoutNs; 475 if (operationTimeoutNs > 0) { 476 locateTimeoutNs = remainingTimeNs(); 477 if (locateTimeoutNs <= 0) { 478 failAll(actions, tries); 479 return; 480 } 481 } else { 482 locateTimeoutNs = -1L; 483 } 484 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 485 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 486 addListener(CompletableFuture.allOf(actions 487 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 488 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 489 if (error != null) { 490 error = unwrapCompletionException(translateException(error)); 491 if (error instanceof DoNotRetryIOException) { 492 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 493 return; 494 } 495 addError(action, error, null); 496 locateFailed.add(action); 497 } else { 498 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 499 action); 500 } 501 })) 502 .toArray(CompletableFuture[]::new)), (v, r) -> { 503 if (!actionsByServer.isEmpty()) { 504 sendOrDelay(actionsByServer, tries); 505 } 506 if (!locateFailed.isEmpty()) { 507 tryResubmit(locateFailed.stream(), tries, false, false); 508 } 509 }); 510 } 511 512 public List<CompletableFuture<T>> call() { 513 groupAndSend(actions.stream(), 1); 514 return futures; 515 } 516}