001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
022import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
023import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
024import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
025
026import com.google.errorprone.annotations.RestrictedApi;
027import java.io.IOException;
028import java.io.PrintWriter;
029import java.io.StringWriter;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.IdentityHashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Optional;
038import java.util.OptionalLong;
039import java.util.concurrent.CompletableFuture;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentLinkedQueue;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.ConcurrentSkipListMap;
044import java.util.concurrent.TimeUnit;
045import java.util.function.Supplier;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.commons.lang3.mutable.MutableBoolean;
049import org.apache.hadoop.hbase.DoNotRetryIOException;
050import org.apache.hadoop.hbase.ExtendedCellScannable;
051import org.apache.hadoop.hbase.HBaseServerException;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.PrivateCellUtil;
055import org.apache.hadoop.hbase.RetryImmediatelyException;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
059import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
060import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
061import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
062import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
063import org.apache.hadoop.hbase.ipc.HBaseRpcController;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.io.netty.util.Timer;
071
072import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
073import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
076
077/**
078 * Retry caller for batch.
079 * <p>
080 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
081 * other single operations
082 * <p>
083 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
084 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
085 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
086 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
087 * the depth of the tree.
088 */
089@InterfaceAudience.Private
090class AsyncBatchRpcRetryingCaller<T> {
091
092  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
093
094  private final Timer retryTimer;
095
096  private final AsyncConnectionImpl conn;
097
098  private final TableName tableName;
099
100  private final List<Action> actions;
101
102  private final List<CompletableFuture<T>> futures;
103
104  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
105
106  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
107
108  private final int maxAttempts;
109
110  private final long operationTimeoutNs;
111
112  private final long rpcTimeoutNs;
113
114  private final int startLogErrorsCnt;
115
116  private final long startNs;
117
118  private final HBaseServerExceptionPauseManager pauseManager;
119
120  private final Map<String, byte[]> requestAttributes;
121
122  // we can not use HRegionLocation as the map key because the hashCode and equals method of
123  // HRegionLocation only consider serverName.
124  // package private for testing log output
125  static final class RegionRequest {
126
127    public final HRegionLocation loc;
128
129    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
130
131    public RegionRequest(HRegionLocation loc) {
132      this.loc = loc;
133    }
134  }
135
136  private static final class ServerRequest {
137
138    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
139      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
140
141    public void addAction(HRegionLocation loc, Action action) {
142      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
143        () -> new RegionRequest(loc)).actions.add(action);
144    }
145
146    public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
147      actionsByRegion.put(regionName, regionReq);
148    }
149
150    public int getPriority() {
151      return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
152        .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
153    }
154  }
155
156  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
157    TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
158    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
159    Map<String, byte[]> requestAttributes) {
160    this.retryTimer = retryTimer;
161    this.conn = conn;
162    this.tableName = tableName;
163    this.maxAttempts = maxAttempts;
164    this.operationTimeoutNs = operationTimeoutNs;
165    this.rpcTimeoutNs = rpcTimeoutNs;
166    this.startLogErrorsCnt = startLogErrorsCnt;
167    this.actions = new ArrayList<>(actions.size());
168    this.futures = new ArrayList<>(actions.size());
169    this.action2Future = new IdentityHashMap<>(actions.size());
170    for (int i = 0, n = actions.size(); i < n; i++) {
171      Row rawAction = actions.get(i);
172      Action action;
173      if (rawAction instanceof OperationWithAttributes) {
174        action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
175      } else {
176        action = new Action(rawAction, i);
177      }
178      if (hasIncrementOrAppend(rawAction)) {
179        action.setNonce(conn.getNonceGenerator().newNonce());
180      }
181      this.actions.add(action);
182      CompletableFuture<T> future = new CompletableFuture<>();
183      futures.add(future);
184      action2Future.put(action, future);
185    }
186    this.action2Errors = new IdentityHashMap<>();
187    this.startNs = System.nanoTime();
188    this.pauseManager =
189      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
190    this.requestAttributes = requestAttributes;
191  }
192
193  private static boolean hasIncrementOrAppend(Row action) {
194    if (action instanceof Append || action instanceof Increment) {
195      return true;
196    } else if (action instanceof RowMutations) {
197      return hasIncrementOrAppend((RowMutations) action);
198    } else if (action instanceof CheckAndMutate) {
199      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
200    }
201    return false;
202  }
203
204  private static boolean hasIncrementOrAppend(RowMutations mutations) {
205    for (Mutation mutation : mutations.getMutations()) {
206      if (mutation instanceof Append || mutation instanceof Increment) {
207        return true;
208      }
209    }
210    return false;
211  }
212
213  private List<ThrowableWithExtraContext> removeErrors(Action action) {
214    synchronized (action2Errors) {
215      return action2Errors.remove(action);
216    }
217  }
218
219  private void logRegionsException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
220    Throwable error, ServerName serverName) {
221    if (tries > startLogErrorsCnt) {
222      String regions =
223        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
224          .collect(Collectors.joining(",", "[", "]"));
225      LOG.warn("Process batch for {} from {} failed, tries={}", regions, serverName, tries, error);
226    }
227  }
228
229  private static final int MAX_SAMPLED_ERRORS = 3;
230
231  @RestrictedApi(explanation = "Should only be called in tests", link = "",
232      allowedOnPath = ".*/(src/test/|AsyncBatchRpcRetryingCaller).*")
233  static void logActionsException(int tries, int startLogErrorsCnt, RegionRequest regionReq,
234    IdentityHashMap<Action, Throwable> action2Error, ServerName serverName) {
235    if (tries <= startLogErrorsCnt || action2Error.isEmpty()) {
236      return;
237    }
238    if (LOG.isWarnEnabled()) {
239      StringWriter sw = new StringWriter();
240      PrintWriter action2ErrorWriter = new PrintWriter(sw);
241      action2ErrorWriter.println();
242      Iterator<Map.Entry<Action, Throwable>> iter = action2Error.entrySet().iterator();
243      for (int i = 0; i < MAX_SAMPLED_ERRORS && iter.hasNext(); i++) {
244        Map.Entry<Action, Throwable> entry = iter.next();
245        action2ErrorWriter.print(entry.getKey().getAction());
246        action2ErrorWriter.print(" => ");
247        entry.getValue().printStackTrace(action2ErrorWriter);
248      }
249      action2ErrorWriter.flush();
250      LOG.warn("Process batch for {} on {}, {}/{} actions failed, tries={}, sampled {} errors: {}",
251        regionReq.loc.getRegion().getRegionNameAsString(), serverName, action2Error.size(),
252        regionReq.actions.size(), tries, Math.min(MAX_SAMPLED_ERRORS, action2Error.size()),
253        sw.toString());
254    }
255    // if trace is enabled, we log all the details
256    if (LOG.isTraceEnabled()) {
257      action2Error.forEach((action, error) -> LOG.trace(
258        "Process action {} in batch for {} on {} failed, tries={}", action.getAction(),
259        regionReq.loc.getRegion().getRegionNameAsString(), serverName, tries, error));
260    }
261  }
262
263  private String getExtraContextForError(ServerName serverName) {
264    return serverName != null ? serverName.getServerName() : "";
265  }
266
267  private void addError(Action action, Throwable error, ServerName serverName) {
268    List<ThrowableWithExtraContext> errors;
269    synchronized (action2Errors) {
270      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
271    }
272    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
273      getExtraContextForError(serverName)));
274  }
275
276  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
277    actions.forEach(action -> addError(action, error, serverName));
278  }
279
280  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
281    CompletableFuture<T> future = action2Future.get(action);
282    if (future.isDone()) {
283      return;
284    }
285    ThrowableWithExtraContext errorWithCtx =
286      new ThrowableWithExtraContext(error, currentTime, extras);
287    List<ThrowableWithExtraContext> errors = removeErrors(action);
288    if (errors == null) {
289      errors = Collections.singletonList(errorWithCtx);
290    } else {
291      errors.add(errorWithCtx);
292    }
293    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
294  }
295
296  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
297    long currentTime = EnvironmentEdgeManager.currentTime();
298    String extras = getExtraContextForError(serverName);
299    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
300  }
301
302  private void failAll(Stream<Action> actions, int tries) {
303    actions.forEach(action -> {
304      CompletableFuture<T> future = action2Future.get(action);
305      if (future.isDone()) {
306        return;
307      }
308      future.completeExceptionally(new RetriesExhaustedException(tries,
309        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
310    });
311  }
312
313  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
314    List<ExtendedCellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
315    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
316    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
317    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
318    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
319    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
320      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
321      // multiRequestBuilder will be populated with region actions.
322      // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
323      // action list.
324      RequestConverter.buildNoDataRegionActions(entry.getKey(),
325        entry.getValue().actions.stream()
326          .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
327          .collect(Collectors.toList()),
328        cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
329        indexMap);
330    }
331    return multiRequestBuilder.build();
332  }
333
334  @SuppressWarnings("unchecked")
335  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
336    RegionResult regionResult, List<Action> failedActions, Throwable regionException,
337    MutableBoolean retryImmediately, IdentityHashMap<Action, Throwable> action2Error) {
338    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
339    if (result == null) {
340      LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
341        + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
342        + regionReq.loc.getRegion().getRegionNameAsString());
343      addError(action, new RuntimeException("Invalid response"), serverName);
344      failedActions.add(action);
345    } else if (result instanceof Throwable) {
346      Throwable error = translateException((Throwable) result);
347      action2Error.put(action, error);
348      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
349      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
350        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
351          getExtraContextForError(serverName));
352      } else {
353        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
354          retryImmediately.setTrue();
355        }
356        failedActions.add(action);
357      }
358    } else {
359      action2Future.get(action).complete((T) result);
360    }
361  }
362
363  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
364    ServerName serverName, MultiResponse resp) {
365    ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
366      serverName, resp);
367    List<Action> failedActions = new ArrayList<>();
368    MutableBoolean retryImmediately = new MutableBoolean(false);
369    actionsByRegion.forEach((rn, regionReq) -> {
370      RegionResult regionResult = resp.getResults().get(rn);
371      Throwable regionException = resp.getException(rn);
372      if (regionResult != null) {
373        // Here we record the exceptions and log it at once, to avoid flooding log output if lots of
374        // actions are failed. For example, if the region's memstore is full, all actions will
375        // received a RegionTooBusyException, see HBASE-29390.
376        IdentityHashMap<Action, Throwable> action2Error = new IdentityHashMap<>();
377        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
378          regionResult, failedActions, regionException, retryImmediately, action2Error));
379        logActionsException(tries, startLogErrorsCnt, regionReq, action2Error, serverName);
380      } else {
381        Throwable error;
382        if (regionException == null) {
383          LOG.error("Server sent us neither results nor exceptions for {}",
384            Bytes.toStringBinary(rn));
385          error = new RuntimeException("Invalid response");
386        } else {
387          error = translateException(regionException);
388        }
389        logRegionsException(tries, () -> Stream.of(regionReq), error, serverName);
390        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
391        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
392          failAll(regionReq.actions.stream(), tries, error, serverName);
393          return;
394        }
395        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
396          retryImmediately.setTrue();
397        }
398        addError(regionReq.actions, error, serverName);
399        failedActions.addAll(regionReq.actions);
400      }
401    });
402    if (!failedActions.isEmpty()) {
403      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
404    }
405  }
406
407  private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
408    long remainingNs;
409    if (operationTimeoutNs > 0) {
410      remainingNs = pauseManager.remainingTimeNs(startNs);
411      if (remainingNs <= 0) {
412        failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
413          tries);
414        return;
415      }
416    } else {
417      remainingNs = Long.MAX_VALUE;
418    }
419    ClientService.Interface stub;
420    try {
421      stub = conn.getRegionServerStub(serverName);
422    } catch (IOException e) {
423      onError(serverReq.actionsByRegion, tries, e, serverName);
424      return;
425    }
426    ClientProtos.MultiRequest req;
427    List<ExtendedCellScannable> cells = new ArrayList<>();
428    // Map from a created RegionAction to the original index for a RowMutations within
429    // the original list of actions. This will be used to process the results when there
430    // is RowMutations/CheckAndMutate in the action list.
431    Map<Integer, Integer> indexMap = new HashMap<>();
432    try {
433      req = buildReq(serverReq.actionsByRegion, cells, indexMap);
434    } catch (IOException e) {
435      onError(serverReq.actionsByRegion, tries, e, serverName);
436      return;
437    }
438    HBaseRpcController controller = conn.rpcControllerFactory.newController();
439    resetController(controller, Math.min(rpcTimeoutNs, remainingNs), serverReq.getPriority(),
440      tableName);
441    controller.setRequestAttributes(requestAttributes);
442    if (!cells.isEmpty()) {
443      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cells));
444    }
445    stub.multi(controller, req, resp -> {
446      if (controller.failed()) {
447        onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
448      } else {
449        try {
450          onComplete(serverReq.actionsByRegion, tries, serverName,
451            ResponseConverter.getResults(req, indexMap, resp, controller.cellScanner()));
452        } catch (Exception e) {
453          onError(serverReq.actionsByRegion, tries, e, serverName);
454          return;
455        }
456      }
457    });
458  }
459
460  // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
461  // based on the load of the region server and the region.
462  private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
463    Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
464    Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
465    if (!optStats.isPresent()) {
466      actionsByServer.forEach((serverName, serverReq) -> {
467        metrics.ifPresent(MetricsConnection::incrNormalRunners);
468        sendToServer(serverName, serverReq, tries);
469      });
470      return;
471    }
472    ServerStatisticTracker stats = optStats.get();
473    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
474    actionsByServer.forEach((serverName, serverReq) -> {
475      ServerStatistics serverStats = stats.getStats(serverName);
476      Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
477      serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
478        long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
479        groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
480          .setRegionRequest(regionName, regionReq);
481      });
482      groupByBackoff.forEach((backoff, sr) -> {
483        if (backoff > 0) {
484          metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
485          retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
486            TimeUnit.MILLISECONDS);
487        } else {
488          metrics.ifPresent(MetricsConnection::incrNormalRunners);
489          sendToServer(serverName, sr, tries);
490        }
491      });
492    });
493  }
494
495  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
496    ServerName serverName) {
497    Throwable error = translateException(t);
498    logRegionsException(tries, () -> actionsByRegion.values().stream(), error, serverName);
499    actionsByRegion.forEach(
500      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
501    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
502      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
503        serverName);
504      return;
505    }
506    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
507      .collect(Collectors.toList());
508    addError(copiedActions, error, serverName);
509    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
510  }
511
512  private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
513    Throwable error) {
514    if (immediately) {
515      groupAndSend(actions, tries);
516      return;
517    }
518
519    OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
520    if (!maybePauseNsToUse.isPresent()) {
521      failAll(actions, tries);
522      return;
523    }
524    long delayNs = maybePauseNsToUse.getAsLong();
525    if (HBaseServerException.isServerOverloaded(error)) {
526      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
527      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
528    }
529    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
530  }
531
532  private void groupAndSend(Stream<Action> actions, int tries) {
533    long locateTimeoutNs;
534    if (operationTimeoutNs > 0) {
535      locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
536      if (locateTimeoutNs <= 0) {
537        failAll(actions, tries);
538        return;
539      }
540    } else {
541      locateTimeoutNs = -1L;
542    }
543    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
544    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
545    addListener(CompletableFuture.allOf(actions
546      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
547        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
548          if (error != null) {
549            error = unwrapCompletionException(translateException(error));
550            if (error instanceof DoNotRetryIOException) {
551              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
552              return;
553            }
554            addError(action, error, null);
555            locateFailed.add(action);
556          } else {
557            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
558              action);
559          }
560        }))
561      .toArray(CompletableFuture[]::new)), (v, r) -> {
562        if (!actionsByServer.isEmpty()) {
563          sendOrDelay(actionsByServer, tries);
564        }
565        if (!locateFailed.isEmpty()) {
566          tryResubmit(locateFailed.stream(), tries, false, null);
567        }
568      });
569  }
570
571  public List<CompletableFuture<T>> call() {
572    groupAndSend(actions.stream(), 1);
573    return futures;
574  }
575}