001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Objects;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.function.Consumer;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
041import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
042import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.yetus.audience.InterfaceStability;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * This class  allows a continuous flow of requests. It's written to be compatible with a
051 * synchronous caller such as HTable.
052 * <p>
053 * The caller sends a buffer of operation, by calling submit. This class extract from this list
054 * the operations it can send, i.e. the operations that are on region that are not considered
055 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
056 * iterate on the list. If, and only if, the maximum number of current task is reached, the call
057 * to submit will block. Alternatively, the caller can call submitAll, in which case all the
058 * operations will be sent. Each call to submit returns a future-like object that can be used
059 * to track operation progress.
060 * </p>
061 * <p>
062 * The class manages internally the retries.
063 * </p>
064 * <p>
065 * The errors are tracked inside the Future object that is returned.
066 * The results are always tracked inside the Future object and can be retrieved when the call
067 * has finished. Partial results can also be retrieved if some part of multi-request failed.
068 * </p>
069 * <p>
070 * This class is thread safe.
071 * Internally, the class is thread safe enough to manage simultaneously new submission and results
072 * arising from older operations.
073 * </p>
074 * <p>
075 * Internally, this class works with {@link Row}, this mean it could be theoretically used for
076 * gets as well.
077 * </p>
078 */
079@InterfaceAudience.Private
080@InterfaceStability.Evolving
081class AsyncProcess {
082  private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class);
083  private static final AtomicLong COUNTER = new AtomicLong();
084
085  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
086
087  /**
088   * Configure the number of failures after which the client will start logging. A few failures
089   * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
090   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
091   * this stage.
092   */
093  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
094      "hbase.client.start.log.errors.counter";
095  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
096
097  /**
098   * Configuration to decide whether to log details for batch error
099   */
100  public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
101
102  /**
103   * Return value from a submit that didn't contain any requests.
104   */
105  private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
106    final Object[] result = new Object[0];
107
108    @Override
109    public boolean hasError() {
110      return false;
111    }
112
113    @Override
114    public RetriesExhaustedWithDetailsException getErrors() {
115      return null;
116    }
117
118    @Override
119    public List<? extends Row> getFailedOperations() {
120      return null;
121    }
122
123    @Override
124    public Object[] getResults() {
125      return result;
126    }
127
128    @Override
129    public void waitUntilDone() throws InterruptedIOException {
130    }
131  };
132
133  // TODO: many of the fields should be made private
134  final long id;
135
136  final ClusterConnection connection;
137  private final RpcRetryingCallerFactory rpcCallerFactory;
138  final RpcControllerFactory rpcFactory;
139
140  // Start configuration settings.
141  final int startLogErrorsCnt;
142
143  final long pause;
144  final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
145  final int numTries;
146  long serverTrackerTimeout;
147  final long primaryCallTimeoutMicroseconds;
148  /** Whether to log details for batch errors */
149  final boolean logBatchErrorDetails;
150  // End configuration settings.
151
152  /**
153   * The traffic control for requests.
154   */
155  final RequestController requestController;
156  public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
157  private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
158  private final int periodToLog;
159  AsyncProcess(ClusterConnection hc, Configuration conf,
160      RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
161    if (hc == null) {
162      throw new IllegalArgumentException("ClusterConnection cannot be null.");
163    }
164
165    this.connection = hc;
166
167    this.id = COUNTER.incrementAndGet();
168
169    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
170        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
171    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
172    if (configuredPauseForCQTBE < pause) {
173      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
174          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
175          + ", will use " + pause + " instead.");
176      this.pauseForCQTBE = pause;
177    } else {
178      this.pauseForCQTBE = configuredPauseForCQTBE;
179    }
180    // how many times we could try in total, one more than retry number
181    this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
182        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
183    this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
184    this.startLogErrorsCnt =
185        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
186    this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
187    // Server tracker allows us to do faster, and yet useful (hopefully), retries.
188    // However, if we are too useful, we might fail very quickly due to retry count limit.
189    // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
190    // retry time if normal retries were used. Then we will retry until this time runs out.
191    // If we keep hitting one server, the net effect will be the incremental backoff, and
192    // essentially the same number of retries as planned. If we have to do faster retries,
193    // we will do more retries in aggregate, but the user will be none the wiser.
194    this.serverTrackerTimeout = 0L;
195    for (int i = 0; i < this.numTries; ++i) {
196      serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i);
197    }
198
199    this.rpcCallerFactory = rpcCaller;
200    this.rpcFactory = rpcFactory;
201    this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
202
203    this.requestController = RequestControllerFactory.create(conf);
204  }
205
206  /**
207   * The submitted task may be not accomplished at all if there are too many running tasks or
208   * other limits.
209   * @param <CResult> The class to cast the result
210   * @param task The setting and data
211   * @return AsyncRequestFuture
212   */
213  public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
214    AsyncRequestFuture reqFuture = checkTask(task);
215    if (reqFuture != null) {
216      return reqFuture;
217    }
218    SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
219    switch (submittedRows) {
220      case ALL:
221        return submitAll(task);
222      case AT_LEAST_ONE:
223        return submit(task, true);
224      default:
225        return submit(task, false);
226    }
227  }
228
229  /**
230   * Extract from the rows list what we can submit. The rows we can not submit are kept in the
231   * list. Does not send requests to replicas (not currently used for anything other
232   * than streaming puts anyway).
233   *
234   * @param task The setting and data
235   * @param atLeastOne true if we should submit at least a subset.
236   */
237  private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
238    boolean atLeastOne) throws InterruptedIOException {
239    TableName tableName = task.getTableName();
240    RowAccess<? extends Row> rows = task.getRowAccess();
241    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
242    List<Action> retainedActions = new ArrayList<>(rows.size());
243
244    NonceGenerator ng = this.connection.getNonceGenerator();
245    long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
246
247    // Location errors that happen before we decide what requests to take.
248    List<Exception> locationErrors = null;
249    List<Integer> locationErrorRows = null;
250    RequestController.Checker checker = requestController.newChecker();
251    boolean firstIter = true;
252    do {
253      // Wait until there is at least one slot for a new task.
254      requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
255      int posInList = -1;
256      if (!firstIter) {
257        checker.reset();
258      }
259      Iterator<? extends Row> it = rows.iterator();
260      while (it.hasNext()) {
261        Row r = it.next();
262        HRegionLocation loc;
263        try {
264          if (r == null) {
265            throw new IllegalArgumentException("#" + id + ", row cannot be null");
266          }
267          // Make sure we get 0-s replica.
268          RegionLocations locs = connection.locateRegion(
269              tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
270          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
271            throw new IOException("#" + id + ", no location found, aborting submit for"
272                + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
273          }
274          loc = locs.getDefaultRegionLocation();
275        } catch (IOException ex) {
276          locationErrors = new ArrayList<>(1);
277          locationErrorRows = new ArrayList<>(1);
278          LOG.error("Failed to get region location ", ex);
279          // This action failed before creating ars. Retain it, but do not add to submit list.
280          // We will then add it to ars in an already-failed state.
281
282          int priority = HConstants.NORMAL_QOS;
283          if (r instanceof Mutation) {
284            priority = ((Mutation) r).getPriority();
285          }
286          retainedActions.add(new Action(r, ++posInList, priority));
287          locationErrors.add(ex);
288          locationErrorRows.add(posInList);
289          it.remove();
290          break; // Backward compat: we stop considering actions on location error.
291        }
292        ReturnCode code = checker.canTakeRow(loc, r);
293        if (code == ReturnCode.END) {
294          break;
295        }
296        if (code == ReturnCode.INCLUDE) {
297          int priority = HConstants.NORMAL_QOS;
298          if (r instanceof Mutation) {
299            priority = ((Mutation) r).getPriority();
300          }
301          Action action = new Action(r, ++posInList, priority);
302          setNonce(ng, r, action);
303          retainedActions.add(action);
304          // TODO: replica-get is not supported on this path
305          byte[] regionName = loc.getRegionInfo().getRegionName();
306          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
307          it.remove();
308        }
309      }
310      firstIter = false;
311    } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
312
313    if (retainedActions.isEmpty()) return NO_REQS_RESULT;
314
315    return submitMultiActions(task, retainedActions, nonceGroup,
316        locationErrors, locationErrorRows, actionsByServer);
317  }
318
319  <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
320      List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
321      List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
322    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
323    // Add location errors if any
324    if (locationErrors != null) {
325      for (int i = 0; i < locationErrors.size(); ++i) {
326        int originalIndex = locationErrorRows.get(i);
327        Row row = retainedActions.get(originalIndex).getAction();
328        ars.manageError(originalIndex, row,
329            AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
330      }
331    }
332    ars.sendMultiAction(actionsByServer, 1, null, false);
333    return ars;
334  }
335
336  /**
337   * Helper that is used when grouping the actions per region server.
338   *
339   * @param server - server
340   * @param regionName - regionName
341   * @param action - the action to add to the multiaction
342   * @param actionsByServer the multiaction per server
343   * @param nonceGroup Nonce group.
344   */
345  static void addAction(ServerName server, byte[] regionName, Action action,
346      Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
347    MultiAction multiAction = actionsByServer.get(server);
348    if (multiAction == null) {
349      multiAction = new MultiAction();
350      actionsByServer.put(server, multiAction);
351    }
352    if (action.hasNonce() && !multiAction.hasNonceGroup()) {
353      multiAction.setNonceGroup(nonceGroup);
354    }
355
356    multiAction.add(regionName, action);
357  }
358
359  /**
360   * Submit immediately the list of rows, whatever the server status. Kept for backward
361   * compatibility: it allows to be used with the batch interface that return an array of objects.
362   * @param task The setting and data
363   */
364  private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
365    RowAccess<? extends Row> rows = task.getRowAccess();
366    List<Action> actions = new ArrayList<>(rows.size());
367
368    // The position will be used by the processBatch to match the object array returned.
369    int posInList = -1;
370    NonceGenerator ng = this.connection.getNonceGenerator();
371    int highestPriority = HConstants.PRIORITY_UNSET;
372    for (Row r : rows) {
373      posInList++;
374      if (r instanceof Put) {
375        Put put = (Put) r;
376        if (put.isEmpty()) {
377          throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
378        }
379        highestPriority = Math.max(put.getPriority(), highestPriority);
380      }
381      Action action = new Action(r, posInList, highestPriority);
382      setNonce(ng, r, action);
383      actions.add(action);
384    }
385    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
386    ars.groupAndSendMultiAction(actions, 1);
387    return ars;
388  }
389
390  private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
391    if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
392      return NO_REQS_RESULT;
393    }
394    Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
395    checkOperationTimeout(task.getOperationTimeout());
396    checkRpcTimeout(task.getRpcTimeout());
397    return null;
398  }
399
400  private void setNonce(NonceGenerator ng, Row r, Action action) {
401    if (!(r instanceof Append) && !(r instanceof Increment)) return;
402    action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
403  }
404
405  private int checkTimeout(String name, int timeout) {
406    if (timeout < 0) {
407      throw new RuntimeException("The " + name + " must be bigger than zero,"
408        + "current value is" + timeout);
409    }
410    return timeout;
411  }
412  private int checkOperationTimeout(int operationTimeout) {
413    return checkTimeout("operation timeout", operationTimeout);
414  }
415
416  private int checkRpcTimeout(int rpcTimeout) {
417    return checkTimeout("rpc timeout", rpcTimeout);
418  }
419
420  <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
421      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
422    return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
423  }
424
425  /** Wait until the async does not have more than max tasks in progress. */
426  protected void waitForMaximumCurrentTasks(int max, TableName tableName)
427      throws InterruptedIOException {
428    requestController.waitForMaximumCurrentTasks(max, id, periodToLog,
429      getLogger(tableName, max));
430  }
431
432  private Consumer<Long> getLogger(TableName tableName, long max) {
433    return (currentInProgress) -> {
434      LOG.info("#" + id + (max < 0 ?
435          ", waiting for any free slot" :
436          ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress="
437          + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
438    };
439  }
440
441  void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
442    requestController.incTaskCounters(regions, sn);
443  }
444
445
446  void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
447    requestController.decTaskCounters(regions, sn);
448  }
449
450  /**
451   * Create a caller. Isolated to be easily overridden in the tests.
452   */
453  protected RpcRetryingCaller<AbstractResponse> createCaller(
454      CancellableRegionServerCallable callable, int rpcTimeout) {
455    return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
456  }
457
458  /**
459   * Creates the server error tracker to use inside process.
460   * Currently, to preserve the main assumption about current retries, and to work well with
461   * the retry-limit-based calculation, the calculation is local per Process object.
462   * We may benefit from connection-wide tracking of server errors.
463   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
464   */
465  ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
466    return new ConnectionImplementation.ServerErrorTracker(
467        this.serverTrackerTimeout, this.numTries);
468  }
469
470  static boolean isReplicaGet(Row row) {
471    return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
472  }
473
474}