001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.CellUtil.createCellScanner; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 026import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 027import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.IdentityHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ConcurrentSkipListMap; 042import java.util.concurrent.TimeUnit; 043import java.util.function.Supplier; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.hadoop.hbase.CellScannable; 047import org.apache.hadoop.hbase.DoNotRetryIOException; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; 052import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; 053import org.apache.hadoop.hbase.ipc.HBaseRpcController; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.io.netty.util.Timer; 061 062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 066 067/** 068 * Retry caller for batch. 069 * <p> 070 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with 071 * other single operations 072 * <p> 073 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the 074 * implementation, we will record a {@code tries} parameter for each operation group, and if it is 075 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can 076 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of 077 * the depth of the tree. 078 */ 079@InterfaceAudience.Private 080class AsyncBatchRpcRetryingCaller<T> { 081 082 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); 083 084 private final Timer retryTimer; 085 086 private final AsyncConnectionImpl conn; 087 088 private final TableName tableName; 089 090 private final List<Action> actions; 091 092 private final List<CompletableFuture<T>> futures; 093 094 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future; 095 096 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors; 097 098 private final long pauseNs; 099 100 private final int maxAttempts; 101 102 private final long operationTimeoutNs; 103 104 private final long rpcTimeoutNs; 105 106 private final int startLogErrorsCnt; 107 108 private final long startNs; 109 110 // we can not use HRegionLocation as the map key because the hashCode and equals method of 111 // HRegionLocation only consider serverName. 112 private static final class RegionRequest { 113 114 public final HRegionLocation loc; 115 116 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>(); 117 118 public RegionRequest(HRegionLocation loc) { 119 this.loc = loc; 120 } 121 } 122 123 private static final class ServerRequest { 124 125 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion = 126 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 127 128 public void addAction(HRegionLocation loc, Action action) { 129 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), 130 () -> new RegionRequest(loc)).actions.add(action); 131 } 132 } 133 134 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 135 TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts, 136 long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 137 this.retryTimer = retryTimer; 138 this.conn = conn; 139 this.tableName = tableName; 140 this.pauseNs = pauseNs; 141 this.maxAttempts = maxAttempts; 142 this.operationTimeoutNs = operationTimeoutNs; 143 this.rpcTimeoutNs = rpcTimeoutNs; 144 this.startLogErrorsCnt = startLogErrorsCnt; 145 146 this.actions = new ArrayList<>(actions.size()); 147 this.futures = new ArrayList<>(actions.size()); 148 this.action2Future = new IdentityHashMap<>(actions.size()); 149 for (int i = 0, n = actions.size(); i < n; i++) { 150 Row rawAction = actions.get(i); 151 Action action = new Action(rawAction, i); 152 if (rawAction instanceof Append || rawAction instanceof Increment) { 153 action.setNonce(conn.getNonceGenerator().newNonce()); 154 } 155 this.actions.add(action); 156 CompletableFuture<T> future = new CompletableFuture<>(); 157 futures.add(future); 158 action2Future.put(action, future); 159 } 160 this.action2Errors = new IdentityHashMap<>(); 161 this.startNs = System.nanoTime(); 162 } 163 164 private long remainingTimeNs() { 165 return operationTimeoutNs - (System.nanoTime() - startNs); 166 } 167 168 private List<ThrowableWithExtraContext> removeErrors(Action action) { 169 synchronized (action2Errors) { 170 return action2Errors.remove(action); 171 } 172 } 173 174 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier, 175 Throwable error, ServerName serverName) { 176 if (tries > startLogErrorsCnt) { 177 String regions = 178 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") 179 .collect(Collectors.joining(",", "[", "]")); 180 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + 181 " failed, tries=" + tries, error); 182 } 183 } 184 185 private String getExtraContextForError(ServerName serverName) { 186 return serverName != null ? serverName.getServerName() : ""; 187 } 188 189 private void addError(Action action, Throwable error, ServerName serverName) { 190 List<ThrowableWithExtraContext> errors; 191 synchronized (action2Errors) { 192 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); 193 } 194 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), 195 getExtraContextForError(serverName))); 196 } 197 198 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) { 199 actions.forEach(action -> addError(action, error, serverName)); 200 } 201 202 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) { 203 CompletableFuture<T> future = action2Future.get(action); 204 if (future.isDone()) { 205 return; 206 } 207 ThrowableWithExtraContext errorWithCtx = 208 new ThrowableWithExtraContext(error, currentTime, extras); 209 List<ThrowableWithExtraContext> errors = removeErrors(action); 210 if (errors == null) { 211 errors = Collections.singletonList(errorWithCtx); 212 } else { 213 errors.add(errorWithCtx); 214 } 215 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors)); 216 } 217 218 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) { 219 long currentTime = EnvironmentEdgeManager.currentTime(); 220 String extras = getExtraContextForError(serverName); 221 actions.forEach(action -> failOne(action, tries, error, currentTime, extras)); 222 } 223 224 private void failAll(Stream<Action> actions, int tries) { 225 actions.forEach(action -> { 226 CompletableFuture<T> future = action2Future.get(action); 227 if (future.isDone()) { 228 return; 229 } 230 future.completeExceptionally(new RetriesExhaustedException(tries, 231 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); 232 }); 233 } 234 235 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion, 236 List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException { 237 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder(); 238 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder(); 239 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); 240 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); 241 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) { 242 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 243 // multiRequestBuilder will be populated with region actions. 244 // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the 245 // action list. 246 RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells, 247 multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, 248 rowMutationsIndexMap); 249 } 250 return multiRequestBuilder.build(); 251 } 252 253 @SuppressWarnings("unchecked") 254 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName, 255 RegionResult regionResult, List<Action> failedActions, Throwable regionException) { 256 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); 257 if (result == null) { 258 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + 259 Bytes.toStringBinary(action.getAction().getRow()) + "' of " + 260 regionReq.loc.getRegion().getRegionNameAsString()); 261 addError(action, new RuntimeException("Invalid response"), serverName); 262 failedActions.add(action); 263 } else if (result instanceof Throwable) { 264 Throwable error = translateException((Throwable) result); 265 logException(tries, () -> Stream.of(regionReq), error, serverName); 266 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 267 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 268 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), 269 getExtraContextForError(serverName)); 270 } else { 271 failedActions.add(action); 272 } 273 } else { 274 action2Future.get(action).complete((T) result); 275 } 276 } 277 278 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries, 279 ServerName serverName, MultiResponse resp) { 280 List<Action> failedActions = new ArrayList<>(); 281 actionsByRegion.forEach((rn, regionReq) -> { 282 RegionResult regionResult = resp.getResults().get(rn); 283 Throwable regionException = resp.getException(rn); 284 if (regionResult != null) { 285 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, 286 regionResult, failedActions, regionException)); 287 } else { 288 Throwable error; 289 if (regionException == null) { 290 LOG 291 .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); 292 error = new RuntimeException("Invalid response"); 293 } else { 294 error = translateException(regionException); 295 } 296 logException(tries, () -> Stream.of(regionReq), error, serverName); 297 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); 298 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 299 failAll(regionReq.actions.stream(), tries, error, serverName); 300 return; 301 } 302 addError(regionReq.actions, error, serverName); 303 failedActions.addAll(regionReq.actions); 304 } 305 }); 306 if (!failedActions.isEmpty()) { 307 tryResubmit(failedActions.stream(), tries); 308 } 309 } 310 311 private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) { 312 long remainingNs; 313 if (operationTimeoutNs > 0) { 314 remainingNs = remainingTimeNs(); 315 if (remainingNs <= 0) { 316 failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) 317 .flatMap(r -> r.actions.stream()), tries); 318 return; 319 } 320 } else { 321 remainingNs = Long.MAX_VALUE; 322 } 323 actionsByServer.forEach((sn, serverReq) -> { 324 ClientService.Interface stub; 325 try { 326 stub = conn.getRegionServerStub(sn); 327 } catch (IOException e) { 328 onError(serverReq.actionsByRegion, tries, e, sn); 329 return; 330 } 331 ClientProtos.MultiRequest req; 332 List<CellScannable> cells = new ArrayList<>(); 333 // Map from a created RegionAction to the original index for a RowMutations within 334 // the original list of actions. This will be used to process the results when there 335 // is RowMutations in the action list. 336 Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>(); 337 try { 338 req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap); 339 } catch (IOException e) { 340 onError(serverReq.actionsByRegion, tries, e, sn); 341 return; 342 } 343 HBaseRpcController controller = conn.rpcControllerFactory.newController(); 344 resetController(controller, Math.min(rpcTimeoutNs, remainingNs)); 345 if (!cells.isEmpty()) { 346 controller.setCellScanner(createCellScanner(cells)); 347 } 348 stub.multi(controller, req, resp -> { 349 if (controller.failed()) { 350 onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn); 351 } else { 352 try { 353 onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req, 354 rowMutationsIndexMap, resp, controller.cellScanner())); 355 } catch (Exception e) { 356 onError(serverReq.actionsByRegion, tries, e, sn); 357 return; 358 } 359 } 360 }); 361 }); 362 } 363 364 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t, 365 ServerName serverName) { 366 Throwable error = translateException(t); 367 logException(tries, () -> actionsByRegion.values().stream(), error, serverName); 368 actionsByRegion.forEach( 369 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); 370 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { 371 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, 372 serverName); 373 return; 374 } 375 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) 376 .collect(Collectors.toList()); 377 addError(copiedActions, error, serverName); 378 tryResubmit(copiedActions.stream(), tries); 379 } 380 381 private void tryResubmit(Stream<Action> actions, int tries) { 382 long delayNs; 383 if (operationTimeoutNs > 0) { 384 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 385 if (maxDelayNs <= 0) { 386 failAll(actions, tries); 387 return; 388 } 389 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); 390 } else { 391 delayNs = getPauseTime(pauseNs, tries - 1); 392 } 393 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); 394 } 395 396 private void groupAndSend(Stream<Action> actions, int tries) { 397 long locateTimeoutNs; 398 if (operationTimeoutNs > 0) { 399 locateTimeoutNs = remainingTimeNs(); 400 if (locateTimeoutNs <= 0) { 401 failAll(actions, tries); 402 return; 403 } 404 } else { 405 locateTimeoutNs = -1L; 406 } 407 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>(); 408 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>(); 409 addListener(CompletableFuture.allOf(actions 410 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), 411 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { 412 if (error != null) { 413 error = unwrapCompletionException(translateException(error)); 414 if (error instanceof DoNotRetryIOException) { 415 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); 416 return; 417 } 418 addError(action, error, null); 419 locateFailed.add(action); 420 } else { 421 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, 422 action); 423 } 424 })) 425 .toArray(CompletableFuture[]::new)), (v, r) -> { 426 if (!actionsByServer.isEmpty()) { 427 send(actionsByServer, tries); 428 } 429 if (!locateFailed.isEmpty()) { 430 tryResubmit(locateFailed.stream(), tries); 431 } 432 }); 433 } 434 435 public List<CompletableFuture<T>> call() { 436 groupAndSend(actions.stream(), 1); 437 return futures; 438 } 439}