001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
027import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
028import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.IdentityHashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Optional;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ConcurrentSkipListMap;
043import java.util.concurrent.TimeUnit;
044import java.util.function.Supplier;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.commons.lang3.mutable.MutableBoolean;
048import org.apache.hadoop.hbase.CellScannable;
049import org.apache.hadoop.hbase.DoNotRetryIOException;
050import org.apache.hadoop.hbase.HBaseServerException;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.RetryImmediatelyException;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
057import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
058import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
059import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
060import org.apache.hadoop.hbase.ipc.HBaseRpcController;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.io.netty.util.Timer;
068
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
073
074/**
075 * Retry caller for batch.
076 * <p>
077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
078 * other single operations
079 * <p>
080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
084 * the depth of the tree.
085 */
086@InterfaceAudience.Private
087class AsyncBatchRpcRetryingCaller<T> {
088
089  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
090
091  private final Timer retryTimer;
092
093  private final AsyncConnectionImpl conn;
094
095  private final TableName tableName;
096
097  private final List<Action> actions;
098
099  private final List<CompletableFuture<T>> futures;
100
101  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
102
103  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
104
105  private final long pauseNs;
106
107  private final long pauseNsForServerOverloaded;
108
109  private final int maxAttempts;
110
111  private final long operationTimeoutNs;
112
113  private final long rpcTimeoutNs;
114
115  private final int startLogErrorsCnt;
116
117  private final long startNs;
118
119  // we can not use HRegionLocation as the map key because the hashCode and equals method of
120  // HRegionLocation only consider serverName.
121  private static final class RegionRequest {
122
123    public final HRegionLocation loc;
124
125    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
126
127    public RegionRequest(HRegionLocation loc) {
128      this.loc = loc;
129    }
130  }
131
132  private static final class ServerRequest {
133
134    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
135      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
136
137    public void addAction(HRegionLocation loc, Action action) {
138      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
139        () -> new RegionRequest(loc)).actions.add(action);
140    }
141
142    public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
143      actionsByRegion.put(regionName, regionReq);
144    }
145
146    public int getPriority() {
147      return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
148        .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
149    }
150  }
151
152  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153    TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
154    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
155    this.retryTimer = retryTimer;
156    this.conn = conn;
157    this.tableName = tableName;
158    this.pauseNs = pauseNs;
159    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
160    this.maxAttempts = maxAttempts;
161    this.operationTimeoutNs = operationTimeoutNs;
162    this.rpcTimeoutNs = rpcTimeoutNs;
163    this.startLogErrorsCnt = startLogErrorsCnt;
164    this.actions = new ArrayList<>(actions.size());
165    this.futures = new ArrayList<>(actions.size());
166    this.action2Future = new IdentityHashMap<>(actions.size());
167    for (int i = 0, n = actions.size(); i < n; i++) {
168      Row rawAction = actions.get(i);
169      Action action;
170      if (rawAction instanceof OperationWithAttributes) {
171        action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
172      } else {
173        action = new Action(rawAction, i);
174      }
175      if (hasIncrementOrAppend(rawAction)) {
176        action.setNonce(conn.getNonceGenerator().newNonce());
177      }
178      this.actions.add(action);
179      CompletableFuture<T> future = new CompletableFuture<>();
180      futures.add(future);
181      action2Future.put(action, future);
182    }
183    this.action2Errors = new IdentityHashMap<>();
184    this.startNs = System.nanoTime();
185  }
186
187  private static boolean hasIncrementOrAppend(Row action) {
188    if (action instanceof Append || action instanceof Increment) {
189      return true;
190    } else if (action instanceof RowMutations) {
191      return hasIncrementOrAppend((RowMutations) action);
192    } else if (action instanceof CheckAndMutate) {
193      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
194    }
195    return false;
196  }
197
198  private static boolean hasIncrementOrAppend(RowMutations mutations) {
199    for (Mutation mutation : mutations.getMutations()) {
200      if (mutation instanceof Append || mutation instanceof Increment) {
201        return true;
202      }
203    }
204    return false;
205  }
206
207  private long remainingTimeNs() {
208    return operationTimeoutNs - (System.nanoTime() - startNs);
209  }
210
211  private List<ThrowableWithExtraContext> removeErrors(Action action) {
212    synchronized (action2Errors) {
213      return action2Errors.remove(action);
214    }
215  }
216
217  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
218    Throwable error, ServerName serverName) {
219    if (tries > startLogErrorsCnt) {
220      String regions =
221        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
222          .collect(Collectors.joining(",", "[", "]"));
223      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName
224        + " failed, tries=" + tries, error);
225    }
226  }
227
228  private String getExtraContextForError(ServerName serverName) {
229    return serverName != null ? serverName.getServerName() : "";
230  }
231
232  private void addError(Action action, Throwable error, ServerName serverName) {
233    List<ThrowableWithExtraContext> errors;
234    synchronized (action2Errors) {
235      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
236    }
237    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
238      getExtraContextForError(serverName)));
239  }
240
241  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
242    actions.forEach(action -> addError(action, error, serverName));
243  }
244
245  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
246    CompletableFuture<T> future = action2Future.get(action);
247    if (future.isDone()) {
248      return;
249    }
250    ThrowableWithExtraContext errorWithCtx =
251      new ThrowableWithExtraContext(error, currentTime, extras);
252    List<ThrowableWithExtraContext> errors = removeErrors(action);
253    if (errors == null) {
254      errors = Collections.singletonList(errorWithCtx);
255    } else {
256      errors.add(errorWithCtx);
257    }
258    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
259  }
260
261  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
262    long currentTime = EnvironmentEdgeManager.currentTime();
263    String extras = getExtraContextForError(serverName);
264    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
265  }
266
267  private void failAll(Stream<Action> actions, int tries) {
268    actions.forEach(action -> {
269      CompletableFuture<T> future = action2Future.get(action);
270      if (future.isDone()) {
271        return;
272      }
273      future.completeExceptionally(new RetriesExhaustedException(tries,
274        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
275    });
276  }
277
278  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
279    List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
280    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
281    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
282    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
283    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
284    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
285      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
286      // multiRequestBuilder will be populated with region actions.
287      // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
288      // action list.
289      RequestConverter.buildNoDataRegionActions(entry.getKey(),
290        entry.getValue().actions.stream()
291          .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
292          .collect(Collectors.toList()),
293        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
294        indexMap);
295    }
296    return multiRequestBuilder.build();
297  }
298
299  @SuppressWarnings("unchecked")
300  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
301    RegionResult regionResult, List<Action> failedActions, Throwable regionException,
302    MutableBoolean retryImmediately) {
303    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
304    if (result == null) {
305      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
306        + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
307        + regionReq.loc.getRegion().getRegionNameAsString());
308      addError(action, new RuntimeException("Invalid response"), serverName);
309      failedActions.add(action);
310    } else if (result instanceof Throwable) {
311      Throwable error = translateException((Throwable) result);
312      logException(tries, () -> Stream.of(regionReq), error, serverName);
313      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
314      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
315        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
316          getExtraContextForError(serverName));
317      } else {
318        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
319          retryImmediately.setTrue();
320        }
321        failedActions.add(action);
322      }
323    } else {
324      action2Future.get(action).complete((T) result);
325    }
326  }
327
328  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
329    ServerName serverName, MultiResponse resp) {
330    ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
331      serverName, resp);
332    List<Action> failedActions = new ArrayList<>();
333    MutableBoolean retryImmediately = new MutableBoolean(false);
334    actionsByRegion.forEach((rn, regionReq) -> {
335      RegionResult regionResult = resp.getResults().get(rn);
336      Throwable regionException = resp.getException(rn);
337      if (regionResult != null) {
338        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
339          regionResult, failedActions, regionException, retryImmediately));
340      } else {
341        Throwable error;
342        if (regionException == null) {
343          LOG.error("Server sent us neither results nor exceptions for {}",
344            Bytes.toStringBinary(rn));
345          error = new RuntimeException("Invalid response");
346        } else {
347          error = translateException(regionException);
348        }
349        logException(tries, () -> Stream.of(regionReq), error, serverName);
350        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
351        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
352          failAll(regionReq.actions.stream(), tries, error, serverName);
353          return;
354        }
355        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
356          retryImmediately.setTrue();
357        }
358        addError(regionReq.actions, error, serverName);
359        failedActions.addAll(regionReq.actions);
360      }
361    });
362    if (!failedActions.isEmpty()) {
363      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
364    }
365  }
366
367  private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
368    long remainingNs;
369    if (operationTimeoutNs > 0) {
370      remainingNs = remainingTimeNs();
371      if (remainingNs <= 0) {
372        failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
373          tries);
374        return;
375      }
376    } else {
377      remainingNs = Long.MAX_VALUE;
378    }
379    ClientService.Interface stub;
380    try {
381      stub = conn.getRegionServerStub(serverName);
382    } catch (IOException e) {
383      onError(serverReq.actionsByRegion, tries, e, serverName);
384      return;
385    }
386    ClientProtos.MultiRequest req;
387    List<CellScannable> cells = new ArrayList<>();
388    // Map from a created RegionAction to the original index for a RowMutations within
389    // the original list of actions. This will be used to process the results when there
390    // is RowMutations/CheckAndMutate in the action list.
391    Map<Integer, Integer> indexMap = new HashMap<>();
392    try {
393      req = buildReq(serverReq.actionsByRegion, cells, indexMap);
394    } catch (IOException e) {
395      onError(serverReq.actionsByRegion, tries, e, serverName);
396      return;
397    }
398    HBaseRpcController controller = conn.rpcControllerFactory.newController();
399    resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
400      calcPriority(serverReq.getPriority(), tableName));
401    if (!cells.isEmpty()) {
402      controller.setCellScanner(createCellScanner(cells));
403    }
404    stub.multi(controller, req, resp -> {
405      if (controller.failed()) {
406        onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
407      } else {
408        try {
409          onComplete(serverReq.actionsByRegion, tries, serverName,
410            ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner()));
411        } catch (Exception e) {
412          onError(serverReq.actionsByRegion, tries, e, serverName);
413          return;
414        }
415      }
416    });
417  }
418
419  // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
420  // based on the load of the region server and the region.
421  private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
422    Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
423    Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
424    if (!optStats.isPresent()) {
425      actionsByServer.forEach((serverName, serverReq) -> {
426        metrics.ifPresent(MetricsConnection::incrNormalRunners);
427        sendToServer(serverName, serverReq, tries);
428      });
429      return;
430    }
431    ServerStatisticTracker stats = optStats.get();
432    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
433    actionsByServer.forEach((serverName, serverReq) -> {
434      ServerStatistics serverStats = stats.getStats(serverName);
435      Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
436      serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
437        long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
438        groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
439          .setRegionRequest(regionName, regionReq);
440      });
441      groupByBackoff.forEach((backoff, sr) -> {
442        if (backoff > 0) {
443          metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
444          retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
445            TimeUnit.MILLISECONDS);
446        } else {
447          metrics.ifPresent(MetricsConnection::incrNormalRunners);
448          sendToServer(serverName, sr, tries);
449        }
450      });
451    });
452  }
453
454  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
455    ServerName serverName) {
456    Throwable error = translateException(t);
457    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
458    actionsByRegion.forEach(
459      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
460    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
461      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
462        serverName);
463      return;
464    }
465    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
466      .collect(Collectors.toList());
467    addError(copiedActions, error, serverName);
468    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
469      HBaseServerException.isServerOverloaded(error));
470  }
471
472  private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473    boolean isServerOverloaded) {
474    if (immediately) {
475      groupAndSend(actions, tries);
476      return;
477    }
478    long delayNs;
479    long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
480    if (operationTimeoutNs > 0) {
481      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
482      if (maxDelayNs <= 0) {
483        failAll(actions, tries);
484        return;
485      }
486      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
487    } else {
488      delayNs = getPauseTime(pauseNsToUse, tries - 1);
489    }
490    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
491  }
492
493  private void groupAndSend(Stream<Action> actions, int tries) {
494    long locateTimeoutNs;
495    if (operationTimeoutNs > 0) {
496      locateTimeoutNs = remainingTimeNs();
497      if (locateTimeoutNs <= 0) {
498        failAll(actions, tries);
499        return;
500      }
501    } else {
502      locateTimeoutNs = -1L;
503    }
504    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
505    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
506    addListener(CompletableFuture.allOf(actions
507      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
508        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
509          if (error != null) {
510            error = unwrapCompletionException(translateException(error));
511            if (error instanceof DoNotRetryIOException) {
512              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
513              return;
514            }
515            addError(action, error, null);
516            locateFailed.add(action);
517          } else {
518            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
519              action);
520          }
521        }))
522      .toArray(CompletableFuture[]::new)), (v, r) -> {
523        if (!actionsByServer.isEmpty()) {
524          sendOrDelay(actionsByServer, tries);
525        }
526        if (!locateFailed.isEmpty()) {
527          tryResubmit(locateFailed.stream(), tries, false, false);
528        }
529      });
530  }
531
532  public List<CompletableFuture<T>> call() {
533    groupAndSend(actions.stream(), 1);
534    return futures;
535  }
536}