001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022
023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Objects;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.function.Consumer;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.RegionLocations;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
048import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
049import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
050import org.apache.hadoop.hbase.util.Bytes;
051
052/**
053 * This class  allows a continuous flow of requests. It's written to be compatible with a
054 * synchronous caller such as HTable.
055 * <p>
056 * The caller sends a buffer of operation, by calling submit. This class extract from this list
057 * the operations it can send, i.e. the operations that are on region that are not considered
058 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
059 * iterate on the list. If, and only if, the maximum number of current task is reached, the call
060 * to submit will block. Alternatively, the caller can call submitAll, in which case all the
061 * operations will be sent. Each call to submit returns a future-like object that can be used
062 * to track operation progress.
063 * </p>
064 * <p>
065 * The class manages internally the retries.
066 * </p>
067 * <p>
068 * The errors are tracked inside the Future object that is returned.
069 * The results are always tracked inside the Future object and can be retrieved when the call
070 * has finished. Partial results can also be retrieved if some part of multi-request failed.
071 * </p>
072 * <p>
073 * This class is thread safe.
074 * Internally, the class is thread safe enough to manage simultaneously new submission and results
075 * arising from older operations.
076 * </p>
077 * <p>
078 * Internally, this class works with {@link Row}, this mean it could be theoretically used for
079 * gets as well.
080 * </p>
081 */
082@InterfaceAudience.Private
083@InterfaceStability.Evolving
084class AsyncProcess {
085  private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class);
086  private static final AtomicLong COUNTER = new AtomicLong();
087
088  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
089
090  /**
091   * Configure the number of failures after which the client will start logging. A few failures
092   * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
093   * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at
094   * this stage.
095   */
096  public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
097      "hbase.client.start.log.errors.counter";
098  public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
099
100  /**
101   * Configuration to decide whether to log details for batch error
102   */
103  public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
104
105  /**
106   * Return value from a submit that didn't contain any requests.
107   */
108  private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
109    final Object[] result = new Object[0];
110
111    @Override
112    public boolean hasError() {
113      return false;
114    }
115
116    @Override
117    public RetriesExhaustedWithDetailsException getErrors() {
118      return null;
119    }
120
121    @Override
122    public List<? extends Row> getFailedOperations() {
123      return null;
124    }
125
126    @Override
127    public Object[] getResults() {
128      return result;
129    }
130
131    @Override
132    public void waitUntilDone() throws InterruptedIOException {
133    }
134  };
135
136  // TODO: many of the fields should be made private
137  final long id;
138
139  final ClusterConnection connection;
140  private final RpcRetryingCallerFactory rpcCallerFactory;
141  final RpcControllerFactory rpcFactory;
142
143  // Start configuration settings.
144  final int startLogErrorsCnt;
145
146  final long pause;
147  final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
148  final int numTries;
149  @VisibleForTesting
150  long serverTrackerTimeout;
151  final long primaryCallTimeoutMicroseconds;
152  /** Whether to log details for batch errors */
153  final boolean logBatchErrorDetails;
154  // End configuration settings.
155
156  /**
157   * The traffic control for requests.
158   */
159  @VisibleForTesting
160  final RequestController requestController;
161  public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
162  private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
163  private final int periodToLog;
164  AsyncProcess(ClusterConnection hc, Configuration conf,
165      RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
166    if (hc == null) {
167      throw new IllegalArgumentException("ClusterConnection cannot be null.");
168    }
169
170    this.connection = hc;
171
172    this.id = COUNTER.incrementAndGet();
173
174    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
175        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
176    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
177    if (configuredPauseForCQTBE < pause) {
178      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
179          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
180          + ", will use " + pause + " instead.");
181      this.pauseForCQTBE = pause;
182    } else {
183      this.pauseForCQTBE = configuredPauseForCQTBE;
184    }
185    // how many times we could try in total, one more than retry number
186    this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
187        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
188    this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
189    this.startLogErrorsCnt =
190        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
191    this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
192    // Server tracker allows us to do faster, and yet useful (hopefully), retries.
193    // However, if we are too useful, we might fail very quickly due to retry count limit.
194    // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
195    // retry time if normal retries were used. Then we will retry until this time runs out.
196    // If we keep hitting one server, the net effect will be the incremental backoff, and
197    // essentially the same number of retries as planned. If we have to do faster retries,
198    // we will do more retries in aggregate, but the user will be none the wiser.
199    this.serverTrackerTimeout = 0L;
200    for (int i = 0; i < this.numTries; ++i) {
201      serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i);
202    }
203
204    this.rpcCallerFactory = rpcCaller;
205    this.rpcFactory = rpcFactory;
206    this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
207
208    this.requestController = RequestControllerFactory.create(conf);
209  }
210
211  /**
212   * The submitted task may be not accomplished at all if there are too many running tasks or
213   * other limits.
214   * @param <CResult> The class to cast the result
215   * @param task The setting and data
216   * @return AsyncRequestFuture
217   */
218  public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
219    AsyncRequestFuture reqFuture = checkTask(task);
220    if (reqFuture != null) {
221      return reqFuture;
222    }
223    SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
224    switch (submittedRows) {
225      case ALL:
226        return submitAll(task);
227      case AT_LEAST_ONE:
228        return submit(task, true);
229      default:
230        return submit(task, false);
231    }
232  }
233
234  /**
235   * Extract from the rows list what we can submit. The rows we can not submit are kept in the
236   * list. Does not send requests to replicas (not currently used for anything other
237   * than streaming puts anyway).
238   *
239   * @param task The setting and data
240   * @param atLeastOne true if we should submit at least a subset.
241   */
242  private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
243    boolean atLeastOne) throws InterruptedIOException {
244    TableName tableName = task.getTableName();
245    RowAccess<? extends Row> rows = task.getRowAccess();
246    Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
247    List<Action> retainedActions = new ArrayList<>(rows.size());
248
249    NonceGenerator ng = this.connection.getNonceGenerator();
250    long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
251
252    // Location errors that happen before we decide what requests to take.
253    List<Exception> locationErrors = null;
254    List<Integer> locationErrorRows = null;
255    RequestController.Checker checker = requestController.newChecker();
256    boolean firstIter = true;
257    do {
258      // Wait until there is at least one slot for a new task.
259      requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
260      int posInList = -1;
261      if (!firstIter) {
262        checker.reset();
263      }
264      Iterator<? extends Row> it = rows.iterator();
265      while (it.hasNext()) {
266        Row r = it.next();
267        HRegionLocation loc;
268        try {
269          if (r == null) {
270            throw new IllegalArgumentException("#" + id + ", row cannot be null");
271          }
272          // Make sure we get 0-s replica.
273          RegionLocations locs = connection.locateRegion(
274              tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
275          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
276            throw new IOException("#" + id + ", no location found, aborting submit for"
277                + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
278          }
279          loc = locs.getDefaultRegionLocation();
280        } catch (IOException ex) {
281          locationErrors = new ArrayList<>(1);
282          locationErrorRows = new ArrayList<>(1);
283          LOG.error("Failed to get region location ", ex);
284          // This action failed before creating ars. Retain it, but do not add to submit list.
285          // We will then add it to ars in an already-failed state.
286
287          int priority = HConstants.NORMAL_QOS;
288          if (r instanceof Mutation) {
289            priority = ((Mutation) r).getPriority();
290          }
291          retainedActions.add(new Action(r, ++posInList, priority));
292          locationErrors.add(ex);
293          locationErrorRows.add(posInList);
294          it.remove();
295          break; // Backward compat: we stop considering actions on location error.
296        }
297        ReturnCode code = checker.canTakeRow(loc, r);
298        if (code == ReturnCode.END) {
299          break;
300        }
301        if (code == ReturnCode.INCLUDE) {
302          int priority = HConstants.NORMAL_QOS;
303          if (r instanceof Mutation) {
304            priority = ((Mutation) r).getPriority();
305          }
306          Action action = new Action(r, ++posInList, priority);
307          setNonce(ng, r, action);
308          retainedActions.add(action);
309          // TODO: replica-get is not supported on this path
310          byte[] regionName = loc.getRegionInfo().getRegionName();
311          addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
312          it.remove();
313        }
314      }
315      firstIter = false;
316    } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
317
318    if (retainedActions.isEmpty()) return NO_REQS_RESULT;
319
320    return submitMultiActions(task, retainedActions, nonceGroup,
321        locationErrors, locationErrorRows, actionsByServer);
322  }
323
324  <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
325      List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
326      List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
327    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
328    // Add location errors if any
329    if (locationErrors != null) {
330      for (int i = 0; i < locationErrors.size(); ++i) {
331        int originalIndex = locationErrorRows.get(i);
332        Row row = retainedActions.get(originalIndex).getAction();
333        ars.manageError(originalIndex, row,
334            AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
335      }
336    }
337    ars.sendMultiAction(actionsByServer, 1, null, false);
338    return ars;
339  }
340
341  /**
342   * Helper that is used when grouping the actions per region server.
343   *
344   * @param server - server
345   * @param regionName - regionName
346   * @param action - the action to add to the multiaction
347   * @param actionsByServer the multiaction per server
348   * @param nonceGroup Nonce group.
349   */
350  static void addAction(ServerName server, byte[] regionName, Action action,
351      Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
352    MultiAction multiAction = actionsByServer.get(server);
353    if (multiAction == null) {
354      multiAction = new MultiAction();
355      actionsByServer.put(server, multiAction);
356    }
357    if (action.hasNonce() && !multiAction.hasNonceGroup()) {
358      multiAction.setNonceGroup(nonceGroup);
359    }
360
361    multiAction.add(regionName, action);
362  }
363
364  /**
365   * Submit immediately the list of rows, whatever the server status. Kept for backward
366   * compatibility: it allows to be used with the batch interface that return an array of objects.
367   * @param task The setting and data
368   */
369  private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
370    RowAccess<? extends Row> rows = task.getRowAccess();
371    List<Action> actions = new ArrayList<>(rows.size());
372
373    // The position will be used by the processBatch to match the object array returned.
374    int posInList = -1;
375    NonceGenerator ng = this.connection.getNonceGenerator();
376    int highestPriority = HConstants.PRIORITY_UNSET;
377    for (Row r : rows) {
378      posInList++;
379      if (r instanceof Put) {
380        Put put = (Put) r;
381        if (put.isEmpty()) {
382          throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
383        }
384        highestPriority = Math.max(put.getPriority(), highestPriority);
385      }
386      Action action = new Action(r, posInList, highestPriority);
387      setNonce(ng, r, action);
388      actions.add(action);
389    }
390    AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
391    ars.groupAndSendMultiAction(actions, 1);
392    return ars;
393  }
394
395  private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
396    if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
397      return NO_REQS_RESULT;
398    }
399    Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
400    checkOperationTimeout(task.getOperationTimeout());
401    checkRpcTimeout(task.getRpcTimeout());
402    return null;
403  }
404
405  private void setNonce(NonceGenerator ng, Row r, Action action) {
406    if (!(r instanceof Append) && !(r instanceof Increment)) return;
407    action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
408  }
409
410  private int checkTimeout(String name, int timeout) {
411    if (timeout < 0) {
412      throw new RuntimeException("The " + name + " must be bigger than zero,"
413        + "current value is" + timeout);
414    }
415    return timeout;
416  }
417  private int checkOperationTimeout(int operationTimeout) {
418    return checkTimeout("operation timeout", operationTimeout);
419  }
420
421  private int checkRpcTimeout(int rpcTimeout) {
422    return checkTimeout("rpc timeout", rpcTimeout);
423  }
424
425  @VisibleForTesting
426  <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
427      AsyncProcessTask task, List<Action> actions, long nonceGroup) {
428    return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
429  }
430
431  /** Wait until the async does not have more than max tasks in progress. */
432  protected void waitForMaximumCurrentTasks(int max, TableName tableName)
433      throws InterruptedIOException {
434    requestController.waitForMaximumCurrentTasks(max, id, periodToLog,
435      getLogger(tableName, max));
436  }
437
438  private Consumer<Long> getLogger(TableName tableName, long max) {
439    return (currentInProgress) -> {
440      LOG.info("#" + id + (max < 0 ?
441          ", waiting for any free slot" :
442          ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress="
443          + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
444    };
445  }
446
447  void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
448    requestController.incTaskCounters(regions, sn);
449  }
450
451
452  void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
453    requestController.decTaskCounters(regions, sn);
454  }
455
456  /**
457   * Create a caller. Isolated to be easily overridden in the tests.
458   */
459  @VisibleForTesting
460  protected RpcRetryingCaller<AbstractResponse> createCaller(
461      CancellableRegionServerCallable callable, int rpcTimeout) {
462    return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
463  }
464
465
466  /**
467   * Creates the server error tracker to use inside process.
468   * Currently, to preserve the main assumption about current retries, and to work well with
469   * the retry-limit-based calculation, the calculation is local per Process object.
470   * We may benefit from connection-wide tracking of server errors.
471   * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
472   */
473  ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
474    return new ConnectionImplementation.ServerErrorTracker(
475        this.serverTrackerTimeout, this.numTries);
476  }
477
478  static boolean isReplicaGet(Row row) {
479    return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
480  }
481
482}