001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.IdentityHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ConcurrentSkipListMap;
042import java.util.concurrent.TimeUnit;
043import java.util.function.Supplier;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.commons.lang3.mutable.MutableBoolean;
047import org.apache.hadoop.hbase.CellScannable;
048import org.apache.hadoop.hbase.DoNotRetryIOException;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.RetryImmediatelyException;
051import org.apache.hadoop.hbase.ServerName;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
054import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.io.netty.util.Timer;
063
064import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
065import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
068
069/**
070 * Retry caller for batch.
071 * <p>
072 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
073 * other single operations
074 * <p>
075 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
076 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
077 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
078 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
079 * the depth of the tree.
080 */
081@InterfaceAudience.Private
082class AsyncBatchRpcRetryingCaller<T> {
083
084  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
085
086  private final Timer retryTimer;
087
088  private final AsyncConnectionImpl conn;
089
090  private final TableName tableName;
091
092  private final List<Action> actions;
093
094  private final List<CompletableFuture<T>> futures;
095
096  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
097
098  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
099
100  private final long pauseNs;
101
102  private final int maxAttempts;
103
104  private final long operationTimeoutNs;
105
106  private final long rpcTimeoutNs;
107
108  private final int startLogErrorsCnt;
109
110  private final long startNs;
111
112  // we can not use HRegionLocation as the map key because the hashCode and equals method of
113  // HRegionLocation only consider serverName.
114  private static final class RegionRequest {
115
116    public final HRegionLocation loc;
117
118    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
119
120    public RegionRequest(HRegionLocation loc) {
121      this.loc = loc;
122    }
123  }
124
125  private static final class ServerRequest {
126
127    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
128      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
129
130    public void addAction(HRegionLocation loc, Action action) {
131      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
132        () -> new RegionRequest(loc)).actions.add(action);
133    }
134  }
135
136  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
137      TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
138      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
139    this.retryTimer = retryTimer;
140    this.conn = conn;
141    this.tableName = tableName;
142    this.pauseNs = pauseNs;
143    this.maxAttempts = maxAttempts;
144    this.operationTimeoutNs = operationTimeoutNs;
145    this.rpcTimeoutNs = rpcTimeoutNs;
146    this.startLogErrorsCnt = startLogErrorsCnt;
147
148    this.actions = new ArrayList<>(actions.size());
149    this.futures = new ArrayList<>(actions.size());
150    this.action2Future = new IdentityHashMap<>(actions.size());
151    for (int i = 0, n = actions.size(); i < n; i++) {
152      Row rawAction = actions.get(i);
153      Action action = new Action(rawAction, i);
154      if (rawAction instanceof Append || rawAction instanceof Increment) {
155        action.setNonce(conn.getNonceGenerator().newNonce());
156      }
157      this.actions.add(action);
158      CompletableFuture<T> future = new CompletableFuture<>();
159      futures.add(future);
160      action2Future.put(action, future);
161    }
162    this.action2Errors = new IdentityHashMap<>();
163    this.startNs = System.nanoTime();
164  }
165
166  private long remainingTimeNs() {
167    return operationTimeoutNs - (System.nanoTime() - startNs);
168  }
169
170  private List<ThrowableWithExtraContext> removeErrors(Action action) {
171    synchronized (action2Errors) {
172      return action2Errors.remove(action);
173    }
174  }
175
176  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
177      Throwable error, ServerName serverName) {
178    if (tries > startLogErrorsCnt) {
179      String regions =
180        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
181          .collect(Collectors.joining(",", "[", "]"));
182      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
183        " failed, tries=" + tries, error);
184    }
185  }
186
187  private String getExtraContextForError(ServerName serverName) {
188    return serverName != null ? serverName.getServerName() : "";
189  }
190
191  private void addError(Action action, Throwable error, ServerName serverName) {
192    List<ThrowableWithExtraContext> errors;
193    synchronized (action2Errors) {
194      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
195    }
196    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
197      getExtraContextForError(serverName)));
198  }
199
200  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
201    actions.forEach(action -> addError(action, error, serverName));
202  }
203
204  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
205    CompletableFuture<T> future = action2Future.get(action);
206    if (future.isDone()) {
207      return;
208    }
209    ThrowableWithExtraContext errorWithCtx =
210      new ThrowableWithExtraContext(error, currentTime, extras);
211    List<ThrowableWithExtraContext> errors = removeErrors(action);
212    if (errors == null) {
213      errors = Collections.singletonList(errorWithCtx);
214    } else {
215      errors.add(errorWithCtx);
216    }
217    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
218  }
219
220  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
221    long currentTime = EnvironmentEdgeManager.currentTime();
222    String extras = getExtraContextForError(serverName);
223    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
224  }
225
226  private void failAll(Stream<Action> actions, int tries) {
227    actions.forEach(action -> {
228      CompletableFuture<T> future = action2Future.get(action);
229      if (future.isDone()) {
230        return;
231      }
232      future.completeExceptionally(new RetriesExhaustedException(tries,
233        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
234    });
235  }
236
237  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
238      List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
239    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
240    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
241    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
242    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
243    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
244      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
245      // multiRequestBuilder will be populated with region actions.
246      // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
247      // action list.
248      RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
249        multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
250        rowMutationsIndexMap);
251    }
252    return multiRequestBuilder.build();
253  }
254
255  @SuppressWarnings("unchecked")
256  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
257      RegionResult regionResult, List<Action> failedActions, Throwable regionException,
258      MutableBoolean retryImmediately) {
259    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
260    if (result == null) {
261      LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
262        Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
263        regionReq.loc.getRegion().getRegionNameAsString());
264      addError(action, new RuntimeException("Invalid response"), serverName);
265      failedActions.add(action);
266    } else if (result instanceof Throwable) {
267      Throwable error = translateException((Throwable) result);
268      logException(tries, () -> Stream.of(regionReq), error, serverName);
269      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
270      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
271        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
272          getExtraContextForError(serverName));
273      } else {
274        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
275          retryImmediately.setTrue();
276        }
277        failedActions.add(action);
278      }
279    } else {
280      action2Future.get(action).complete((T) result);
281    }
282  }
283
284  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
285      ServerName serverName, MultiResponse resp) {
286    List<Action> failedActions = new ArrayList<>();
287    MutableBoolean retryImmediately = new MutableBoolean(false);
288    actionsByRegion.forEach((rn, regionReq) -> {
289      RegionResult regionResult = resp.getResults().get(rn);
290      Throwable regionException = resp.getException(rn);
291      if (regionResult != null) {
292        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
293          regionResult, failedActions, regionException, retryImmediately));
294      } else {
295        Throwable error;
296        if (regionException == null) {
297          LOG.error("Server sent us neither results nor exceptions for {}",
298            Bytes.toStringBinary(rn));
299          error = new RuntimeException("Invalid response");
300        } else {
301          error = translateException(regionException);
302        }
303        logException(tries, () -> Stream.of(regionReq), error, serverName);
304        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
305        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
306          failAll(regionReq.actions.stream(), tries, error, serverName);
307          return;
308        }
309        if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
310          retryImmediately.setTrue();
311        }
312        addError(regionReq.actions, error, serverName);
313        failedActions.addAll(regionReq.actions);
314      }
315    });
316    if (!failedActions.isEmpty()) {
317      tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue());
318    }
319  }
320
321  private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
322    long remainingNs;
323    if (operationTimeoutNs > 0) {
324      remainingNs = remainingTimeNs();
325      if (remainingNs <= 0) {
326        failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
327          .flatMap(r -> r.actions.stream()), tries);
328        return;
329      }
330    } else {
331      remainingNs = Long.MAX_VALUE;
332    }
333    actionsByServer.forEach((sn, serverReq) -> {
334      ClientService.Interface stub;
335      try {
336        stub = conn.getRegionServerStub(sn);
337      } catch (IOException e) {
338        onError(serverReq.actionsByRegion, tries, e, sn);
339        return;
340      }
341      ClientProtos.MultiRequest req;
342      List<CellScannable> cells = new ArrayList<>();
343      // Map from a created RegionAction to the original index for a RowMutations within
344      // the original list of actions. This will be used to process the results when there
345      // is RowMutations in the action list.
346      Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
347      try {
348        req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
349      } catch (IOException e) {
350        onError(serverReq.actionsByRegion, tries, e, sn);
351        return;
352      }
353      HBaseRpcController controller = conn.rpcControllerFactory.newController();
354      resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
355      if (!cells.isEmpty()) {
356        controller.setCellScanner(createCellScanner(cells));
357      }
358      stub.multi(controller, req, resp -> {
359        if (controller.failed()) {
360          onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
361        } else {
362          try {
363            onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
364              rowMutationsIndexMap, resp, controller.cellScanner()));
365          } catch (Exception e) {
366            onError(serverReq.actionsByRegion, tries, e, sn);
367            return;
368          }
369        }
370      });
371    });
372  }
373
374  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
375      ServerName serverName) {
376    Throwable error = translateException(t);
377    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
378    actionsByRegion.forEach(
379      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
380    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
381      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
382        serverName);
383      return;
384    }
385    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
386      .collect(Collectors.toList());
387    addError(copiedActions, error, serverName);
388    tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException);
389  }
390
391  private void tryResubmit(Stream<Action> actions, int tries, boolean immediately) {
392    if (immediately) {
393      groupAndSend(actions, tries);
394      return;
395    }
396    long delayNs;
397    if (operationTimeoutNs > 0) {
398      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
399      if (maxDelayNs <= 0) {
400        failAll(actions, tries);
401        return;
402      }
403      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
404    } else {
405      delayNs = getPauseTime(pauseNs, tries - 1);
406    }
407    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
408  }
409
410  private void groupAndSend(Stream<Action> actions, int tries) {
411    long locateTimeoutNs;
412    if (operationTimeoutNs > 0) {
413      locateTimeoutNs = remainingTimeNs();
414      if (locateTimeoutNs <= 0) {
415        failAll(actions, tries);
416        return;
417      }
418    } else {
419      locateTimeoutNs = -1L;
420    }
421    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
422    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
423    addListener(CompletableFuture.allOf(actions
424      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
425        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
426          if (error != null) {
427            error = unwrapCompletionException(translateException(error));
428            if (error instanceof DoNotRetryIOException) {
429              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
430              return;
431            }
432            addError(action, error, null);
433            locateFailed.add(action);
434          } else {
435            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
436              action);
437          }
438        }))
439      .toArray(CompletableFuture[]::new)), (v, r) -> {
440        if (!actionsByServer.isEmpty()) {
441          send(actionsByServer, tries);
442        }
443        if (!locateFailed.isEmpty()) {
444          tryResubmit(locateFailed.stream(), tries, false);
445        }
446      });
447  }
448
449  public List<CompletableFuture<T>> call() {
450    groupAndSend(actions.stream(), 1);
451    return futures;
452  }
453}