001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
027import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
028import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
029
030import java.io.IOException;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.IdentityHashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Optional;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.ConcurrentSkipListMap;
043import java.util.concurrent.TimeUnit;
044import java.util.function.Supplier;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.commons.lang3.mutable.MutableBoolean;
048import org.apache.hadoop.hbase.CallQueueTooBigException;
049import org.apache.hadoop.hbase.CellScannable;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.RetryImmediatelyException;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
057import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
058import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
059import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
060import org.apache.hadoop.hbase.ipc.HBaseRpcController;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.io.netty.util.Timer;
068
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
073
074/**
075 * Retry caller for batch.
076 * <p>
077 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
078 * other single operations
079 * <p>
080 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
081 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
082 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
083 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
084 * the depth of the tree.
085 */
086@InterfaceAudience.Private
087class AsyncBatchRpcRetryingCaller<T> {
088
089  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
090
091  private final Timer retryTimer;
092
093  private final AsyncConnectionImpl conn;
094
095  private final TableName tableName;
096
097  private final List<Action> actions;
098
099  private final List<CompletableFuture<T>> futures;
100
101  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
102
103  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
104
105  private final long pauseNs;
106
107  private final long pauseForCQTBENs;
108
109  private final int maxAttempts;
110
111  private final long operationTimeoutNs;
112
113  private final long rpcTimeoutNs;
114
115  private final int startLogErrorsCnt;
116
117  private final long startNs;
118
119  // we can not use HRegionLocation as the map key because the hashCode and equals method of
120  // HRegionLocation only consider serverName.
121  private static final class RegionRequest {
122
123    public final HRegionLocation loc;
124
125    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
126
127    public RegionRequest(HRegionLocation loc) {
128      this.loc = loc;
129    }
130  }
131
132  private static final class ServerRequest {
133
134    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
135      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
136
137    public void addAction(HRegionLocation loc, Action action) {
138      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
139        () -> new RegionRequest(loc)).actions.add(action);
140    }
141
142    public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
143      actionsByRegion.put(regionName, regionReq);
144    }
145
146    public int getPriority() {
147      return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
148        .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
149    }
150  }
151
152  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153      TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
154      int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
155    this.retryTimer = retryTimer;
156    this.conn = conn;
157    this.tableName = tableName;
158    this.pauseNs = pauseNs;
159    this.pauseForCQTBENs = pauseForCQTBENs;
160    this.maxAttempts = maxAttempts;
161    this.operationTimeoutNs = operationTimeoutNs;
162    this.rpcTimeoutNs = rpcTimeoutNs;
163    this.startLogErrorsCnt = startLogErrorsCnt;
164    this.actions = new ArrayList<>(actions.size());
165    this.futures = new ArrayList<>(actions.size());
166    this.action2Future = new IdentityHashMap<>(actions.size());
167    for (int i = 0, n = actions.size(); i < n; i++) {
168      Row rawAction = actions.get(i);
169      Action action;
170      if (rawAction instanceof OperationWithAttributes) {
171        action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
172      } else {
173        action = new Action(rawAction, i);
174      }
175      if (rawAction instanceof Append || rawAction instanceof Increment) {
176        action.setNonce(conn.getNonceGenerator().newNonce());
177      }
178      this.actions.add(action);
179      CompletableFuture<T> future = new CompletableFuture<>();
180      futures.add(future);
181      action2Future.put(action, future);
182    }
183    this.action2Errors = new IdentityHashMap<>();
184    this.startNs = System.nanoTime();
185  }
186
187  private long remainingTimeNs() {
188    return operationTimeoutNs - (System.nanoTime() - startNs);
189  }
190
191  private List<ThrowableWithExtraContext> removeErrors(Action action) {
192    synchronized (action2Errors) {
193      return action2Errors.remove(action);
194    }
195  }
196
197  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
198      Throwable error, ServerName serverName) {
199    if (tries > startLogErrorsCnt) {
200      String regions =
201        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
202          .collect(Collectors.joining(",", "[", "]"));
203      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
204        " failed, tries=" + tries, error);
205    }
206  }
207
208  private String getExtraContextForError(ServerName serverName) {
209    return serverName != null ? serverName.getServerName() : "";
210  }
211
212  private void addError(Action action, Throwable error, ServerName serverName) {
213    List<ThrowableWithExtraContext> errors;
214    synchronized (action2Errors) {
215      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
216    }
217    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
218      getExtraContextForError(serverName)));
219  }
220
221  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
222    actions.forEach(action -> addError(action, error, serverName));
223  }
224
225  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
226    CompletableFuture<T> future = action2Future.get(action);
227    if (future.isDone()) {
228      return;
229    }
230    ThrowableWithExtraContext errorWithCtx =
231      new ThrowableWithExtraContext(error, currentTime, extras);
232    List<ThrowableWithExtraContext> errors = removeErrors(action);
233    if (errors == null) {
234      errors = Collections.singletonList(errorWithCtx);
235    } else {
236      errors.add(errorWithCtx);
237    }
238    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
239  }
240
241  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
242    long currentTime = EnvironmentEdgeManager.currentTime();
243    String extras = getExtraContextForError(serverName);
244    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
245  }
246
247  private void failAll(Stream<Action> actions, int tries) {
248    actions.forEach(action -> {
249      CompletableFuture<T> future = action2Future.get(action);
250      if (future.isDone()) {
251        return;
252      }
253      future.completeExceptionally(new RetriesExhaustedException(tries,
254        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
255    });
256  }
257
258  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
259      List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
260    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
261    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
262    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
263    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
264    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
265      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
266      // multiRequestBuilder will be populated with region actions.
267      // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
268      // action list.
269      RequestConverter.buildNoDataRegionActions(entry.getKey(),
270        entry.getValue().actions.stream()
271          .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
272          .collect(Collectors.toList()),
273        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
274        rowMutationsIndexMap);
275    }
276    return multiRequestBuilder.build();
277  }
278
279  @SuppressWarnings("unchecked")
280  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
281      RegionResult regionResult, List<Action> failedActions, Throwable regionException,
282      MutableBoolean retryImmediately) {
283    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
284    if (result == null) {
285      LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
286        Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
287        regionReq.loc.getRegion().getRegionNameAsString());
288      addError(action, new RuntimeException("Invalid response"), serverName);
289      failedActions.add(action);
290    } else if (result instanceof Throwable) {
291      Throwable error = translateException((Throwable) result);
292      logException(tries, () -> Stream.of(regionReq), error, serverName);
293      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
294      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
295        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
296          getExtraContextForError(serverName));
297      } else {
298        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
299          retryImmediately.setTrue();
300        }
301        failedActions.add(action);
302      }
303    } else {
304      action2Future.get(action).complete((T) result);
305    }
306  }
307
308  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
309      ServerName serverName, MultiResponse resp) {
310    ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
311      serverName, resp);
312    List<Action> failedActions = new ArrayList<>();
313    MutableBoolean retryImmediately = new MutableBoolean(false);
314    actionsByRegion.forEach((rn, regionReq) -> {
315      RegionResult regionResult = resp.getResults().get(rn);
316      Throwable regionException = resp.getException(rn);
317      if (regionResult != null) {
318        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
319          regionResult, failedActions, regionException, retryImmediately));
320      } else {
321        Throwable error;
322        if (regionException == null) {
323          LOG.error("Server sent us neither results nor exceptions for {}",
324            Bytes.toStringBinary(rn));
325          error = new RuntimeException("Invalid response");
326        } else {
327          error = translateException(regionException);
328        }
329        logException(tries, () -> Stream.of(regionReq), error, serverName);
330        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
331        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
332          failAll(regionReq.actions.stream(), tries, error, serverName);
333          return;
334        }
335        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
336          retryImmediately.setTrue();
337        }
338        addError(regionReq.actions, error, serverName);
339        failedActions.addAll(regionReq.actions);
340      }
341    });
342    if (!failedActions.isEmpty()) {
343      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
344    }
345  }
346
347  private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
348    long remainingNs;
349    if (operationTimeoutNs > 0) {
350      remainingNs = remainingTimeNs();
351      if (remainingNs <= 0) {
352        failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
353          tries);
354        return;
355      }
356    } else {
357      remainingNs = Long.MAX_VALUE;
358    }
359    ClientService.Interface stub;
360    try {
361      stub = conn.getRegionServerStub(serverName);
362    } catch (IOException e) {
363      onError(serverReq.actionsByRegion, tries, e, serverName);
364      return;
365    }
366    ClientProtos.MultiRequest req;
367    List<CellScannable> cells = new ArrayList<>();
368    // Map from a created RegionAction to the original index for a RowMutations within
369    // the original list of actions. This will be used to process the results when there
370    // is RowMutations in the action list.
371    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
372    try {
373      req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
374    } catch (IOException e) {
375      onError(serverReq.actionsByRegion, tries, e, serverName);
376      return;
377    }
378    HBaseRpcController controller = conn.rpcControllerFactory.newController();
379    resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
380      calcPriority(serverReq.getPriority(), tableName));
381    if (!cells.isEmpty()) {
382      controller.setCellScanner(createCellScanner(cells));
383    }
384    stub.multi(controller, req, resp -> {
385      if (controller.failed()) {
386        onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
387      } else {
388        try {
389          onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
390            rowMutationsIndexMap, resp, controller.cellScanner()));
391        } catch (Exception e) {
392          onError(serverReq.actionsByRegion, tries, e, serverName);
393          return;
394        }
395      }
396    });
397  }
398
399  // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
400  // based on the load of the region server and the region.
401  private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
402    Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
403    Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
404    if (!optStats.isPresent()) {
405      actionsByServer.forEach((serverName, serverReq) -> {
406        metrics.ifPresent(MetricsConnection::incrNormalRunners);
407        sendToServer(serverName, serverReq, tries);
408      });
409      return;
410    }
411    ServerStatisticTracker stats = optStats.get();
412    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
413    actionsByServer.forEach((serverName, serverReq) -> {
414      ServerStatistics serverStats = stats.getStats(serverName);
415      Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
416      serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
417        long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
418        groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
419          .setRegionRequest(regionName, regionReq);
420      });
421      groupByBackoff.forEach((backoff, sr) -> {
422        if (backoff > 0) {
423          metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
424          retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
425            TimeUnit.MILLISECONDS);
426        } else {
427          metrics.ifPresent(MetricsConnection::incrNormalRunners);
428          sendToServer(serverName, sr, tries);
429        }
430      });
431    });
432  }
433
434  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
435      ServerName serverName) {
436    Throwable error = translateException(t);
437    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
438    actionsByRegion.forEach(
439      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
440    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
441      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
442        serverName);
443      return;
444    }
445    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
446      .collect(Collectors.toList());
447    addError(copiedActions, error, serverName);
448    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
449      error instanceof CallQueueTooBigException);
450  }
451
452  private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
453      boolean isCallQueueTooBig) {
454    if (immediately) {
455      groupAndSend(actions, tries);
456      return;
457    }
458    long delayNs;
459    long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
460    if (operationTimeoutNs > 0) {
461      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
462      if (maxDelayNs <= 0) {
463        failAll(actions, tries);
464        return;
465      }
466      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
467    } else {
468      delayNs = getPauseTime(pauseNsToUse, tries - 1);
469    }
470    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
471  }
472
473  private void groupAndSend(Stream<Action> actions, int tries) {
474    long locateTimeoutNs;
475    if (operationTimeoutNs > 0) {
476      locateTimeoutNs = remainingTimeNs();
477      if (locateTimeoutNs <= 0) {
478        failAll(actions, tries);
479        return;
480      }
481    } else {
482      locateTimeoutNs = -1L;
483    }
484    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
485    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
486    addListener(CompletableFuture.allOf(actions
487      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
488        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
489          if (error != null) {
490            error = unwrapCompletionException(translateException(error));
491            if (error instanceof DoNotRetryIOException) {
492              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
493              return;
494            }
495            addError(action, error, null);
496            locateFailed.add(action);
497          } else {
498            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
499              action);
500          }
501        }))
502      .toArray(CompletableFuture[]::new)), (v, r) -> {
503        if (!actionsByServer.isEmpty()) {
504          sendOrDelay(actionsByServer, tries);
505        }
506        if (!locateFailed.isEmpty()) {
507          tryResubmit(locateFailed.stream(), tries, false, false);
508        }
509      });
510  }
511
512  public List<CompletableFuture<T>> call() {
513    groupAndSend(actions.stream(), 1);
514    return futures;
515  }
516}