001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.CellUtil.createCellScanner; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.IdentityHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ConcurrentSkipListMap; 042import java.util.concurrent.TimeUnit; 043import java.util.function.Supplier; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.commons.lang3.mutable.MutableBoolean; 047import org.apache.hadoop.hbase.CellScannable; 048import org.apache.hadoop.hbase.DoNotRetryIOException; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.RetryImmediatelyException; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 054import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.io.netty.util.Timer; 063 064import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 065import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 068 069/** 070 * Retry caller for batch. 071 * <p> 072 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 073 * other single operations 074 * <p> 075 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 076 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 077 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 078 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 079 * the depth of the tree. 080 */ 081@InterfaceAudience.Private 082class AsyncBatchRpcRetryingCaller<T> { 083 084 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 085 086 private final Timer retryTimer; 087 088 private final AsyncConnectionImpl conn; 089 090 private final TableName tableName; 091 092 private final List<Action> actions; 093 094 private final List<CompletableFuture<T>> futures; 095 096 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 097 098 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 099 100 private final long pauseNs; 101 102 private final int maxAttempts; 103 104 private final long operationTimeoutNs; 105 106 private final long rpcTimeoutNs; 107 108 private final int startLogErrorsCnt; 109 110 private final long startNs; 111 112 // we can not use HRegionLocation as the map key because the hashCode and equals method of 113 // HRegionLocation only consider serverName. 114 private static final class RegionRequest { 115 116 public final HRegionLocation loc; 117 118 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 119 120 public RegionRequest(HRegionLocation loc) { 121 this.loc = loc; 122 } 123 } 124 125 private static final class ServerRequest { 126 127 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 128 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 129 130 public void addAction(HRegionLocation loc, Action action) { 131 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 132 () -> new RegionRequest(loc)).actions.add(action); 133 } 134 } 135 136 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 137 TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts, 138 long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 139 this.retryTimer = retryTimer; 140 this.conn = conn; 141 this.tableName = tableName; 142 this.pauseNs = pauseNs; 143 this.maxAttempts = maxAttempts; 144 this.operationTimeoutNs = operationTimeoutNs; 145 this.rpcTimeoutNs = rpcTimeoutNs; 146 this.startLogErrorsCnt = startLogErrorsCnt; 147 148 this.actions = new ArrayList<>(actions.size()); 149 this.futures = new ArrayList<>(actions.size()); 150 this.action2Future = new IdentityHashMap<>(actions.size()); 151 for (int i = 0, n = actions.size(); i < n; i++) { 152 Row rawAction = actions.get(i); 153 Action action = new Action(rawAction, i); 154 if (rawAction instanceof Append || rawAction instanceof Increment) { 155 action.setNonce(conn.getNonceGenerator().newNonce()); 156 } 157 this.actions.add(action); 158 CompletableFuture<T> future = new CompletableFuture<>(); 159 futures.add(future); 160 action2Future.put(action, future); 161 } 162 this.action2Errors = new IdentityHashMap<>(); 163 this.startNs = System.nanoTime(); 164 } 165 166 private long remainingTimeNs() { 167 return operationTimeoutNs - (System.nanoTime() - startNs); 168 } 169 170 private List<ThrowableWithExtraContext> removeErrors(Action action) { 171 synchronized (action2Errors) { 172 return action2Errors.remove(action); 173 } 174 } 175 176 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 177 Throwable error, ServerName serverName) { 178 if (tries > startLogErrorsCnt) { 179 String regions = 180 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 181 .collect(Collectors.joining(",", "[", "]")); 182 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + 183 " failed, tries=" + tries, error); 184 } 185 } 186 187 private String getExtraContextForError(ServerName serverName) { 188 return serverName != null ? serverName.getServerName() : ""; 189 } 190 191 private void addError(Action action, Throwable error, ServerName serverName) { 192 List<ThrowableWithExtraContext> errors; 193 synchronized (action2Errors) { 194 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 195 } 196 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 197 getExtraContextForError(serverName))); 198 } 199 200 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 201 actions.forEach(action -> addError(action, error, serverName)); 202 } 203 204 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 205 CompletableFuture<T> future = action2Future.get(action); 206 if (future.isDone()) { 207 return; 208 } 209 ThrowableWithExtraContext errorWithCtx = 210 new ThrowableWithExtraContext(error, currentTime, extras); 211 List<ThrowableWithExtraContext> errors = removeErrors(action); 212 if (errors == null) { 213 errors = Collections.singletonList(errorWithCtx); 214 } else { 215 errors.add(errorWithCtx); 216 } 217 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 218 } 219 220 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 221 long currentTime = EnvironmentEdgeManager.currentTime(); 222 String extras = getExtraContextForError(serverName); 223 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 224 } 225 226 private void failAll(Stream<Action> actions, int tries) { 227 actions.forEach(action -> { 228 CompletableFuture<T> future = action2Future.get(action); 229 if (future.isDone()) { 230 return; 231 } 232 future.completeExceptionally(new RetriesExhaustedException(tries, 233 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 234 }); 235 } 236 237 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 238 List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException { 239 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 240 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 241 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 242 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 243 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 244 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 245 // multiRequestBuilder will be populated with region actions. 246 // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the 247 // action list. 248 RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells, 249 multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 250 rowMutationsIndexMap); 251 } 252 return multiRequestBuilder.build(); 253 } 254 255 @SuppressWarnings("unchecked") 256 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 257 RegionResult regionResult, List<Action> failedActions, Throwable regionException, 258 MutableBoolean retryImmediately) { 259 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 260 if (result == null) { 261 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + 262 Bytes.toStringBinary(action.getAction().getRow()) + "' of " + 263 regionReq.loc.getRegion().getRegionNameAsString()); 264 addError(action, new RuntimeException("Invalid response"), serverName); 265 failedActions.add(action); 266 } else if (result instanceof Throwable) { 267 Throwable error = translateException((Throwable) result); 268 logException(tries, () -> Stream.of(regionReq), error, serverName); 269 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 270 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 271 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 272 getExtraContextForError(serverName)); 273 } else { 274 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 275 retryImmediately.setTrue(); 276 } 277 failedActions.add(action); 278 } 279 } else { 280 action2Future.get(action).complete((T) result); 281 } 282 } 283 284 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 285 ServerName serverName, MultiResponse resp) { 286 List<Action> failedActions = new ArrayList<>(); 287 MutableBoolean retryImmediately = new MutableBoolean(false); 288 actionsByRegion.forEach((rn, regionReq) -> { 289 RegionResult regionResult = resp.getResults().get(rn); 290 Throwable regionException = resp.getException(rn); 291 if (regionResult != null) { 292 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 293 regionResult, failedActions, regionException, retryImmediately)); 294 } else { 295 Throwable error; 296 if (regionException == null) { 297 LOG.error("Server sent us neither results nor exceptions for {}", 298 Bytes.toStringBinary(rn)); 299 error = new RuntimeException("Invalid response"); 300 } else { 301 error = translateException(regionException); 302 } 303 logException(tries, () -> Stream.of(regionReq), error, serverName); 304 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 305 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 306 failAll(regionReq.actions.stream(), tries, error, serverName); 307 return; 308 } 309 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) { 310 retryImmediately.setTrue(); 311 } 312 addError(regionReq.actions, error, serverName); 313 failedActions.addAll(regionReq.actions); 314 } 315 }); 316 if (!failedActions.isEmpty()) { 317 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue()); 318 } 319 } 320 321 private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) { 322 long remainingNs; 323 if (operationTimeoutNs > 0) { 324 remainingNs = remainingTimeNs(); 325 if (remainingNs <= 0) { 326 failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) 327 .flatMap(r -> r.actions.stream()), tries); 328 return; 329 } 330 } else { 331 remainingNs = Long.MAX_VALUE; 332 } 333 actionsByServer.forEach((sn, serverReq) -> { 334 ClientService.Interface stub; 335 try { 336 stub = conn.getRegionServerStub(sn); 337 } catch (IOException e) { 338 onError(serverReq.actionsByRegion, tries, e, sn); 339 return; 340 } 341 ClientProtos.MultiRequest req; 342 List<CellScannable> cells = new ArrayList<>(); 343 // Map from a created RegionAction to the original index for a RowMutations within 344 // the original list of actions. This will be used to process the results when there 345 // is RowMutations in the action list. 346 Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>(); 347 try { 348 req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); 349 } catch (IOException e) { 350 onError(serverReq.actionsByRegion, tries, e, sn); 351 return; 352 } 353 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 354 resetController(controller, Math.min(rpcTimeoutNs, remainingNs)); 355 if (!cells.isEmpty()) { 356 controller.setCellScanner(createCellScanner(cells)); 357 } 358 stub.multi(controller, req, resp -> { 359 if (controller.failed()) { 360 onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); 361 } else { 362 try { 363 onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, 364 rowMutationsIndexMap, resp, controller.cellScanner())); 365 } catch (Exception e) { 366 onError(serverReq.actionsByRegion, tries, e, sn); 367 return; 368 } 369 } 370 }); 371 }); 372 } 373 374 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 375 ServerName serverName) { 376 Throwable error = translateException(t); 377 logException(tries, () -> actionsByRegion.values().stream(), error, serverName); 378 actionsByRegion.forEach( 379 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 380 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 381 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 382 serverName); 383 return; 384 } 385 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 386 .collect(Collectors.toList()); 387 addError(copiedActions, error, serverName); 388 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException); 389 } 390 391 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately) { 392 if (immediately) { 393 groupAndSend(actions, tries); 394 return; 395 } 396 long delayNs; 397 if (operationTimeoutNs > 0) { 398 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 399 if (maxDelayNs <= 0) { 400 failAll(actions, tries); 401 return; 402 } 403 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); 404 } else { 405 delayNs = getPauseTime(pauseNs, tries - 1); 406 } 407 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 408 } 409 410 private void groupAndSend(Stream<Action> actions, int tries) { 411 long locateTimeoutNs; 412 if (operationTimeoutNs > 0) { 413 locateTimeoutNs = remainingTimeNs(); 414 if (locateTimeoutNs <= 0) { 415 failAll(actions, tries); 416 return; 417 } 418 } else { 419 locateTimeoutNs = -1L; 420 } 421 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 422 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 423 addListener(CompletableFuture.allOf(actions 424 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 425 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 426 if (error != null) { 427 error = unwrapCompletionException(translateException(error)); 428 if (error instanceof DoNotRetryIOException) { 429 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 430 return; 431 } 432 addError(action, error, null); 433 locateFailed.add(action); 434 } else { 435 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 436 action); 437 } 438 })) 439 .toArray(CompletableFuture[]::new)), (v, r) -> { 440 if (!actionsByServer.isEmpty()) { 441 send(actionsByServer, tries); 442 } 443 if (!locateFailed.isEmpty()) { 444 tryResubmit(locateFailed.stream(), tries, false); 445 } 446 }); 447 } 448 449 public List<CompletableFuture<T>> call() { 450 groupAndSend(actions.stream(), 1); 451 return futures; 452 } 453}