001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
027import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.IdentityHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ConcurrentSkipListMap;
042import java.util.concurrent.TimeUnit;
043import java.util.function.Supplier;
044import java.util.stream.Collectors;
045import java.util.stream.Stream;
046import org.apache.hadoop.hbase.CellScannable;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
052import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
053import org.apache.hadoop.hbase.ipc.HBaseRpcController;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.io.netty.util.Timer;
061
062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
066
067/**
068 * Retry caller for batch.
069 * <p>
070 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
071 * other single operations
072 * <p>
073 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
074 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
075 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
076 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
077 * the depth of the tree.
078 */
079@InterfaceAudience.Private
080class AsyncBatchRpcRetryingCaller<T> {
081
082  private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
083
084  private final Timer retryTimer;
085
086  private final AsyncConnectionImpl conn;
087
088  private final TableName tableName;
089
090  private final List<Action> actions;
091
092  private final List<CompletableFuture<T>> futures;
093
094  private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
095
096  private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
097
098  private final long pauseNs;
099
100  private final int maxAttempts;
101
102  private final long operationTimeoutNs;
103
104  private final long rpcTimeoutNs;
105
106  private final int startLogErrorsCnt;
107
108  private final long startNs;
109
110  // we can not use HRegionLocation as the map key because the hashCode and equals method of
111  // HRegionLocation only consider serverName.
112  private static final class RegionRequest {
113
114    public final HRegionLocation loc;
115
116    public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
117
118    public RegionRequest(HRegionLocation loc) {
119      this.loc = loc;
120    }
121  }
122
123  private static final class ServerRequest {
124
125    public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
126      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
127
128    public void addAction(HRegionLocation loc, Action action) {
129      computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
130        () -> new RegionRequest(loc)).actions.add(action);
131    }
132  }
133
134  public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
135      TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
136      long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
137    this.retryTimer = retryTimer;
138    this.conn = conn;
139    this.tableName = tableName;
140    this.pauseNs = pauseNs;
141    this.maxAttempts = maxAttempts;
142    this.operationTimeoutNs = operationTimeoutNs;
143    this.rpcTimeoutNs = rpcTimeoutNs;
144    this.startLogErrorsCnt = startLogErrorsCnt;
145
146    this.actions = new ArrayList<>(actions.size());
147    this.futures = new ArrayList<>(actions.size());
148    this.action2Future = new IdentityHashMap<>(actions.size());
149    for (int i = 0, n = actions.size(); i < n; i++) {
150      Row rawAction = actions.get(i);
151      Action action = new Action(rawAction, i);
152      if (rawAction instanceof Append || rawAction instanceof Increment) {
153        action.setNonce(conn.getNonceGenerator().newNonce());
154      }
155      this.actions.add(action);
156      CompletableFuture<T> future = new CompletableFuture<>();
157      futures.add(future);
158      action2Future.put(action, future);
159    }
160    this.action2Errors = new IdentityHashMap<>();
161    this.startNs = System.nanoTime();
162  }
163
164  private long remainingTimeNs() {
165    return operationTimeoutNs - (System.nanoTime() - startNs);
166  }
167
168  private List<ThrowableWithExtraContext> removeErrors(Action action) {
169    synchronized (action2Errors) {
170      return action2Errors.remove(action);
171    }
172  }
173
174  private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
175      Throwable error, ServerName serverName) {
176    if (tries > startLogErrorsCnt) {
177      String regions =
178        regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
179          .collect(Collectors.joining(",", "[", "]"));
180      LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
181        " failed, tries=" + tries, error);
182    }
183  }
184
185  private String getExtraContextForError(ServerName serverName) {
186    return serverName != null ? serverName.getServerName() : "";
187  }
188
189  private void addError(Action action, Throwable error, ServerName serverName) {
190    List<ThrowableWithExtraContext> errors;
191    synchronized (action2Errors) {
192      errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
193    }
194    errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
195      getExtraContextForError(serverName)));
196  }
197
198  private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
199    actions.forEach(action -> addError(action, error, serverName));
200  }
201
202  private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
203    CompletableFuture<T> future = action2Future.get(action);
204    if (future.isDone()) {
205      return;
206    }
207    ThrowableWithExtraContext errorWithCtx =
208      new ThrowableWithExtraContext(error, currentTime, extras);
209    List<ThrowableWithExtraContext> errors = removeErrors(action);
210    if (errors == null) {
211      errors = Collections.singletonList(errorWithCtx);
212    } else {
213      errors.add(errorWithCtx);
214    }
215    future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
216  }
217
218  private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
219    long currentTime = EnvironmentEdgeManager.currentTime();
220    String extras = getExtraContextForError(serverName);
221    actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
222  }
223
224  private void failAll(Stream<Action> actions, int tries) {
225    actions.forEach(action -> {
226      CompletableFuture<T> future = action2Future.get(action);
227      if (future.isDone()) {
228        return;
229      }
230      future.completeExceptionally(new RetriesExhaustedException(tries,
231        Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
232    });
233  }
234
235  private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
236      List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap) throws IOException {
237    ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
238    ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
239    ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
240    ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
241    for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
242      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
243      // multiRequestBuilder will be populated with region actions.
244      // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
245      // action list.
246      RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
247        multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
248        rowMutationsIndexMap);
249    }
250    return multiRequestBuilder.build();
251  }
252
253  @SuppressWarnings("unchecked")
254  private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
255      RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
256    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
257    if (result == null) {
258      LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
259        Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
260        regionReq.loc.getRegion().getRegionNameAsString());
261      addError(action, new RuntimeException("Invalid response"), serverName);
262      failedActions.add(action);
263    } else if (result instanceof Throwable) {
264      Throwable error = translateException((Throwable) result);
265      logException(tries, () -> Stream.of(regionReq), error, serverName);
266      conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
267      if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
268        failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
269          getExtraContextForError(serverName));
270      } else {
271        failedActions.add(action);
272      }
273    } else {
274      action2Future.get(action).complete((T) result);
275    }
276  }
277
278  private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
279      ServerName serverName, MultiResponse resp) {
280    List<Action> failedActions = new ArrayList<>();
281    actionsByRegion.forEach((rn, regionReq) -> {
282      RegionResult regionResult = resp.getResults().get(rn);
283      Throwable regionException = resp.getException(rn);
284      if (regionResult != null) {
285        regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
286          regionResult, failedActions, regionException));
287      } else {
288        Throwable error;
289        if (regionException == null) {
290          LOG
291            .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
292          error = new RuntimeException("Invalid response");
293        } else {
294          error = translateException(regionException);
295        }
296        logException(tries, () -> Stream.of(regionReq), error, serverName);
297        conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
298        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
299          failAll(regionReq.actions.stream(), tries, error, serverName);
300          return;
301        }
302        addError(regionReq.actions, error, serverName);
303        failedActions.addAll(regionReq.actions);
304      }
305    });
306    if (!failedActions.isEmpty()) {
307      tryResubmit(failedActions.stream(), tries);
308    }
309  }
310
311  private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
312    long remainingNs;
313    if (operationTimeoutNs > 0) {
314      remainingNs = remainingTimeNs();
315      if (remainingNs <= 0) {
316        failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
317          .flatMap(r -> r.actions.stream()), tries);
318        return;
319      }
320    } else {
321      remainingNs = Long.MAX_VALUE;
322    }
323    actionsByServer.forEach((sn, serverReq) -> {
324      ClientService.Interface stub;
325      try {
326        stub = conn.getRegionServerStub(sn);
327      } catch (IOException e) {
328        onError(serverReq.actionsByRegion, tries, e, sn);
329        return;
330      }
331      ClientProtos.MultiRequest req;
332      List<CellScannable> cells = new ArrayList<>();
333      // Map from a created RegionAction to the original index for a RowMutations within
334      // the original list of actions. This will be used to process the results when there
335      // is RowMutations in the action list.
336      Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
337      try {
338        req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
339      } catch (IOException e) {
340        onError(serverReq.actionsByRegion, tries, e, sn);
341        return;
342      }
343      HBaseRpcController controller = conn.rpcControllerFactory.newController();
344      resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
345      if (!cells.isEmpty()) {
346        controller.setCellScanner(createCellScanner(cells));
347      }
348      stub.multi(controller, req, resp -> {
349        if (controller.failed()) {
350          onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
351        } else {
352          try {
353            onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
354              rowMutationsIndexMap, resp, controller.cellScanner()));
355          } catch (Exception e) {
356            onError(serverReq.actionsByRegion, tries, e, sn);
357            return;
358          }
359        }
360      });
361    });
362  }
363
364  private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
365      ServerName serverName) {
366    Throwable error = translateException(t);
367    logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
368    actionsByRegion.forEach(
369      (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
370    if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
371      failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
372        serverName);
373      return;
374    }
375    List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
376      .collect(Collectors.toList());
377    addError(copiedActions, error, serverName);
378    tryResubmit(copiedActions.stream(), tries);
379  }
380
381  private void tryResubmit(Stream<Action> actions, int tries) {
382    long delayNs;
383    if (operationTimeoutNs > 0) {
384      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
385      if (maxDelayNs <= 0) {
386        failAll(actions, tries);
387        return;
388      }
389      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
390    } else {
391      delayNs = getPauseTime(pauseNs, tries - 1);
392    }
393    retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
394  }
395
396  private void groupAndSend(Stream<Action> actions, int tries) {
397    long locateTimeoutNs;
398    if (operationTimeoutNs > 0) {
399      locateTimeoutNs = remainingTimeNs();
400      if (locateTimeoutNs <= 0) {
401        failAll(actions, tries);
402        return;
403      }
404    } else {
405      locateTimeoutNs = -1L;
406    }
407    ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
408    ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
409    addListener(CompletableFuture.allOf(actions
410      .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
411        RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
412          if (error != null) {
413            error = unwrapCompletionException(translateException(error));
414            if (error instanceof DoNotRetryIOException) {
415              failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
416              return;
417            }
418            addError(action, error, null);
419            locateFailed.add(action);
420          } else {
421            computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
422              action);
423          }
424        }))
425      .toArray(CompletableFuture[]::new)), (v, r) -> {
426        if (!actionsByServer.isEmpty()) {
427          send(actionsByServer, tries);
428        }
429        if (!locateFailed.isEmpty()) {
430          tryResubmit(locateFailed.stream(), tries);
431        }
432      });
433  }
434
435  public List<CompletableFuture<T>> call() {
436    groupAndSend(actions.stream(), 1);
437    return futures;
438  }
439}