001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Objects;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.function.Consumer;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.RegionLocations;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
038import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
039import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * This class allows a continuous flow of requests. It's written to be compatible with a synchronous
048 * caller such as HTable.
049 * <p>
050 * The caller sends a buffer of operation, by calling submit. This class extract from this list the
051 * operations it can send, i.e. the operations that are on region that are not considered as busy.
052 * The process is asynchronous, i.e. it returns immediately when if has finished to iterate on the
053 * list. If, and only if, the maximum number of current task is reached, the call to submit will
054 * block. Alternatively, the caller can call submitAll, in which case all the operations will be
055 * sent. Each call to submit returns a future-like object that can be used to track operation
056 * progress.
057 * </p>
058 * <p>
059 * The class manages internally the retries.
060 * </p>
061 * <p>
062 * The errors are tracked inside the Future object that is returned. The results are always tracked
063 * inside the Future object and can be retrieved when the call has finished. Partial results can
064 * also be retrieved if some part of multi-request failed.
065 * </p>
066 * <p>
067 * This class is thread safe. Internally, the class is thread safe enough to manage simultaneously
068 * new submission and results arising from older operations.
069 * </p>
070 * <p>
071 * Internally, this class works with {@link Row}, this mean it could be theoretically used for gets
072 * as well.
073 * </p>
074 */
075@InterfaceAudience.Private
076@InterfaceStability.Evolving
077class AsyncProcess {
078  private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class);
079  private static final AtomicLong COUNTER = new AtomicLong();
080
081  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
082
083  /**
084   * Configure the number of failures after which the client will start logging. A few failures is
085   * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
086   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this
087   * stage.
088   */
089  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
090    "hbase.client.start.log.errors.counter";
091  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
092
093  /**
094   * Configuration to decide whether to log details for batch error
095   */
096  public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
097
098  /**
099   * Return value from a submit that didn't contain any requests.
100   */
101  private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
102    final Object[] result = new Object[0];
103
104    @Override
105    public boolean hasError() {
106      return false;
107    }
108
109    @Override
110    public RetriesExhaustedWithDetailsException getErrors() {
111      return null;
112    }
113
114    @Override
115    public List<? extends Row> getFailedOperations() {
116      return null;
117    }
118
119    @Override
120    public Object[] getResults() {
121      return result;
122    }
123
124    @Override
125    public void waitUntilDone() throws InterruptedIOException {
126    }
127  };
128
129  // TODO: many of the fields should be made private
130  final long id;
131
132  final ClusterConnection connection;
133  final ConnectionConfiguration connectionConfiguration;
134  private final RpcRetryingCallerFactory rpcCallerFactory;
135  final RpcControllerFactory rpcFactory;
136
137  // Start configuration settings.
138  final int startLogErrorsCnt;
139
140  final int numTries;
141  long serverTrackerTimeout;
142  final long primaryCallTimeoutMicroseconds;
143  /** Whether to log details for batch errors */
144  final boolean logBatchErrorDetails;
145  // End configuration settings.
146
147  /**
148   * The traffic control for requests.
149   */
150  final RequestController requestController;
151  public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
152  private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
153  private final int periodToLog;
154
155  AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller,
156    RpcControllerFactory rpcFactory) {
157    this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber());
158  }
159
160  AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller,
161    RpcControllerFactory rpcFactory, int retriesNumber) {
162    if (hc == null) {
163      throw new IllegalArgumentException("ClusterConnection cannot be null.");
164    }
165
166    this.connection = hc;
167    this.connectionConfiguration = connection.getConnectionConfiguration();
168
169    this.id = COUNTER.incrementAndGet();
170
171    // how many times we could try in total, one more than retry number
172    this.numTries = retriesNumber + 1;
173    this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
174    this.startLogErrorsCnt =
175      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
176    this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
177    // Server tracker allows us to do faster, and yet useful (hopefully), retries.
178    // However, if we are too useful, we might fail very quickly due to retry count limit.
179    // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
180    // retry time if normal retries were used. Then we will retry until this time runs out.
181    // If we keep hitting one server, the net effect will be the incremental backoff, and
182    // essentially the same number of retries as planned. If we have to do faster retries,
183    // we will do more retries in aggregate, but the user will be none the wiser.
184    this.serverTrackerTimeout = 0L;
185    for (int i = 0; i < this.numTries; ++i) {
186      serverTrackerTimeout = serverTrackerTimeout
187        + ConnectionUtils.getPauseTime(connectionConfiguration.getPauseMillis(), i);
188    }
189
190    this.rpcCallerFactory = rpcCaller;
191    this.rpcFactory = rpcFactory;
192    this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
193
194    this.requestController = RequestControllerFactory.create(conf);
195  }
196
197  /**
198   * The submitted task may be not accomplished at all if there are too many running tasks or other
199   * limits.
200   * @param <CResult> The class to cast the result
201   * @param task      The setting and data n
202   */
203  public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task)
204    throws InterruptedIOException {
205    AsyncRequestFuture reqFuture = checkTask(task);
206    if (reqFuture != null) {
207      return reqFuture;
208    }
209    SubmittedRows submittedRows =
210      task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
211    switch (submittedRows) {
212      case ALL:
213        return submitAll(task);
214      case AT_LEAST_ONE:
215        return submit(task, true);
216      default:
217        return submit(task, false);
218    }
219  }
220
221  /**
222   * Extract from the rows list what we can submit. The rows we can not submit are kept in the list.
223   * Does not send requests to replicas (not currently used for anything other than streaming puts
224   * anyway).
225   * @param task       The setting and data
226   * @param atLeastOne true if we should submit at least a subset.
227   */
228  private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, boolean atLeastOne)
229    throws InterruptedIOException {
230    TableName tableName = task.getTableName();
231    RowAccess<? extends Row> rows = task.getRowAccess();
232    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
233    List<Action> retainedActions = new ArrayList<>(rows.size());
234
235    NonceGenerator ng = this.connection.getNonceGenerator();
236    long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
237
238    // Location errors that happen before we decide what requests to take.
239    List<Exception> locationErrors = null;
240    List<Integer> locationErrorRows = null;
241    RequestController.Checker checker = requestController.newChecker();
242    boolean firstIter = true;
243    do {
244      // Wait until there is at least one slot for a new task.
245      requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
246      int posInList = -1;
247      if (!firstIter) {
248        checker.reset();
249      }
250      Iterator<? extends Row> it = rows.iterator();
251      while (it.hasNext()) {
252        Row r = it.next();
253        HRegionLocation loc;
254        try {
255          if (r == null) {
256            throw new IllegalArgumentException("#" + id + ", row cannot be null");
257          }
258          // Make sure we get 0-s replica.
259          RegionLocations locs = connection.locateRegion(tableName, r.getRow(), true, true,
260            RegionReplicaUtil.DEFAULT_REPLICA_ID);
261          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
262            throw new IOException("#" + id + ", no location found, aborting submit for"
263              + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
264          }
265          loc = locs.getDefaultRegionLocation();
266        } catch (IOException ex) {
267          locationErrors = new ArrayList<>(1);
268          locationErrorRows = new ArrayList<>(1);
269          LOG.error("Failed to get region location ", ex);
270          // This action failed before creating ars. Retain it, but do not add to submit list.
271          // We will then add it to ars in an already-failed state.
272
273          int priority = HConstants.NORMAL_QOS;
274          if (r instanceof Mutation) {
275            priority = ((Mutation) r).getPriority();
276          }
277          retainedActions.add(new Action(r, ++posInList, priority));
278          locationErrors.add(ex);
279          locationErrorRows.add(posInList);
280          it.remove();
281          break; // Backward compat: we stop considering actions on location error.
282        }
283        ReturnCode code = checker.canTakeRow(loc, r);
284        if (code == ReturnCode.END) {
285          break;
286        }
287        if (code == ReturnCode.INCLUDE) {
288          int priority = HConstants.NORMAL_QOS;
289          if (r instanceof Mutation) {
290            priority = ((Mutation) r).getPriority();
291          }
292          Action action = new Action(r, ++posInList, priority);
293          setNonce(ng, r, action);
294          retainedActions.add(action);
295          // TODO: replica-get is not supported on this path
296          byte[] regionName = loc.getRegionInfo().getRegionName();
297          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
298          it.remove();
299        }
300      }
301      firstIter = false;
302    } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
303
304    if (retainedActions.isEmpty()) return NO_REQS_RESULT;
305
306    return submitMultiActions(task, retainedActions, nonceGroup, locationErrors, locationErrorRows,
307      actionsByServer);
308  }
309
310  <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
311    List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
312    List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
313    AsyncRequestFutureImpl<CResult> ars =
314      createAsyncRequestFuture(task, retainedActions, nonceGroup);
315    // Add location errors if any
316    if (locationErrors != null) {
317      for (int i = 0; i < locationErrors.size(); ++i) {
318        int originalIndex = locationErrorRows.get(i);
319        Row row = retainedActions.get(originalIndex).getAction();
320        ars.manageError(originalIndex, row, AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM,
321          locationErrors.get(i), null);
322      }
323    }
324    ars.sendMultiAction(actionsByServer, 1, null, false);
325    return ars;
326  }
327
328  /**
329   * Helper that is used when grouping the actions per region server.
330   * @param server          - server
331   * @param regionName      - regionName
332   * @param action          - the action to add to the multiaction
333   * @param actionsByServer the multiaction per server
334   * @param nonceGroup      Nonce group.
335   */
336  static void addAction(ServerName server, byte[] regionName, Action action,
337    Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
338    MultiAction multiAction = actionsByServer.get(server);
339    if (multiAction == null) {
340      multiAction = new MultiAction();
341      actionsByServer.put(server, multiAction);
342    }
343    if (action.hasNonce() && !multiAction.hasNonceGroup()) {
344      multiAction.setNonceGroup(nonceGroup);
345    }
346
347    multiAction.add(regionName, action);
348  }
349
350  /**
351   * Submit immediately the list of rows, whatever the server status. Kept for backward
352   * compatibility: it allows to be used with the batch interface that return an array of objects.
353   * @param task The setting and data
354   */
355  private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
356    RowAccess<? extends Row> rows = task.getRowAccess();
357    List<Action> actions = new ArrayList<>(rows.size());
358
359    // The position will be used by the processBatch to match the object array returned.
360    int posInList = -1;
361    NonceGenerator ng = this.connection.getNonceGenerator();
362    int highestPriority = HConstants.PRIORITY_UNSET;
363    for (Row r : rows) {
364      posInList++;
365      if (r instanceof Put) {
366        Put put = (Put) r;
367        if (put.isEmpty()) {
368          throw new IllegalArgumentException(
369            "No columns to insert for #" + (posInList + 1) + " item");
370        }
371        highestPriority = Math.max(put.getPriority(), highestPriority);
372      }
373      Action action = new Action(r, posInList, highestPriority);
374      setNonce(ng, r, action);
375      actions.add(action);
376    }
377    AsyncRequestFutureImpl<CResult> ars =
378      createAsyncRequestFuture(task, actions, ng.getNonceGroup());
379    ars.groupAndSendMultiAction(actions, 1);
380    return ars;
381  }
382
383  private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
384    if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
385      return NO_REQS_RESULT;
386    }
387    Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
388    checkOperationTimeout(task.getOperationTimeout());
389    checkRpcTimeout(task.getRpcTimeout());
390    return null;
391  }
392
393  private void setNonce(NonceGenerator ng, Row r, Action action) {
394    if (hasIncrementOrAppend(r)) {
395      action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
396    }
397  }
398
399  private static boolean hasIncrementOrAppend(Row action) {
400    if (action instanceof Append || action instanceof Increment) {
401      return true;
402    } else if (action instanceof RowMutations) {
403      return hasIncrementOrAppend((RowMutations) action);
404    } else if (action instanceof CheckAndMutate) {
405      return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
406    }
407    return false;
408  }
409
410  private static boolean hasIncrementOrAppend(RowMutations mutations) {
411    for (Mutation mutation : mutations.getMutations()) {
412      if (mutation instanceof Append || mutation instanceof Increment) {
413        return true;
414      }
415    }
416    return false;
417  }
418
419  private int checkTimeout(String name, int timeout) {
420    if (timeout < 0) {
421      throw new RuntimeException(
422        "The " + name + " must be bigger than zero," + "current value is" + timeout);
423    }
424    return timeout;
425  }
426
427  private int checkOperationTimeout(int operationTimeout) {
428    return checkTimeout("operation timeout", operationTimeout);
429  }
430
431  private int checkRpcTimeout(int rpcTimeout) {
432    return checkTimeout("rpc timeout", rpcTimeout);
433  }
434
435  <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(AsyncProcessTask task,
436    List<Action> actions, long nonceGroup) {
437    return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
438  }
439
440  /** Wait until the async does not have more than max tasks in progress. */
441  protected void waitForMaximumCurrentTasks(int max, TableName tableName)
442    throws InterruptedIOException {
443    requestController.waitForMaximumCurrentTasks(max, id, periodToLog, getLogger(tableName, max));
444  }
445
446  private Consumer<Long> getLogger(TableName tableName, long max) {
447    return (currentInProgress) -> {
448      LOG.info("#" + id
449        + (max < 0
450          ? ", waiting for any free slot"
451          : ", waiting for some tasks to finish. Expected max=" + max)
452        + ", tasksInProgress=" + currentInProgress
453        + (tableName == null ? "" : ", tableName=" + tableName));
454    };
455  }
456
457  void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
458    requestController.incTaskCounters(regions, sn);
459  }
460
461  void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
462    requestController.decTaskCounters(regions, sn);
463  }
464
465  /**
466   * Create a caller. Isolated to be easily overridden in the tests.
467   */
468  protected RpcRetryingCaller<AbstractResponse>
469    createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
470    return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
471  }
472
473  /**
474   * Creates the server error tracker to use inside process. Currently, to preserve the main
475   * assumption about current retries, and to work well with the retry-limit-based calculation, the
476   * calculation is local per Process object. We may benefit from connection-wide tracking of server
477   * errors.
478   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
479   */
480  ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
481    return new ConnectionImplementation.ServerErrorTracker(this.serverTrackerTimeout,
482      this.numTries);
483  }
484
485  static boolean isReplicaGet(Row row) {
486    return (row instanceof Get) && (((Get) row).getConsistency() == Consistency.TIMELINE);
487  }
488
489}