001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Objects; 029import java.util.concurrent.atomic.AtomicLong; 030import java.util.function.Consumer; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 038import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.apache.yetus.audience.InterfaceStability; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * This class allows a continuous flow of requests. It's written to be compatible with a synchronous 048 * caller such as HTable. 049 * <p> 050 * The caller sends a buffer of operation, by calling submit. This class extract from this list the 051 * operations it can send, i.e. the operations that are on region that are not considered as busy. 052 * The process is asynchronous, i.e. it returns immediately when if has finished to iterate on the 053 * list. If, and only if, the maximum number of current task is reached, the call to submit will 054 * block. Alternatively, the caller can call submitAll, in which case all the operations will be 055 * sent. Each call to submit returns a future-like object that can be used to track operation 056 * progress. 057 * </p> 058 * <p> 059 * The class manages internally the retries. 060 * </p> 061 * <p> 062 * The errors are tracked inside the Future object that is returned. The results are always tracked 063 * inside the Future object and can be retrieved when the call has finished. Partial results can 064 * also be retrieved if some part of multi-request failed. 065 * </p> 066 * <p> 067 * This class is thread safe. Internally, the class is thread safe enough to manage simultaneously 068 * new submission and results arising from older operations. 069 * </p> 070 * <p> 071 * Internally, this class works with {@link Row}, this mean it could be theoretically used for gets 072 * as well. 073 * </p> 074 */ 075@InterfaceAudience.Private 076@InterfaceStability.Evolving 077class AsyncProcess { 078 private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class); 079 private static final AtomicLong COUNTER = new AtomicLong(); 080 081 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; 082 083 /** 084 * Configure the number of failures after which the client will start logging. A few failures is 085 * fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 086 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at this 087 * stage. 088 */ 089 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 090 "hbase.client.start.log.errors.counter"; 091 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 092 093 /** 094 * Configuration to decide whether to log details for batch error 095 */ 096 public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; 097 098 /** 099 * Return value from a submit that didn't contain any requests. 100 */ 101 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { 102 final Object[] result = new Object[0]; 103 104 @Override 105 public boolean hasError() { 106 return false; 107 } 108 109 @Override 110 public RetriesExhaustedWithDetailsException getErrors() { 111 return null; 112 } 113 114 @Override 115 public List<? extends Row> getFailedOperations() { 116 return null; 117 } 118 119 @Override 120 public Object[] getResults() { 121 return result; 122 } 123 124 @Override 125 public void waitUntilDone() throws InterruptedIOException { 126 } 127 }; 128 129 // TODO: many of the fields should be made private 130 final long id; 131 132 final ClusterConnection connection; 133 final ConnectionConfiguration connectionConfiguration; 134 private final RpcRetryingCallerFactory rpcCallerFactory; 135 final RpcControllerFactory rpcFactory; 136 137 // Start configuration settings. 138 final int startLogErrorsCnt; 139 140 final int numTries; 141 long serverTrackerTimeout; 142 final long primaryCallTimeoutMicroseconds; 143 /** Whether to log details for batch errors */ 144 final boolean logBatchErrorDetails; 145 // End configuration settings. 146 147 /** 148 * The traffic control for requests. 149 */ 150 final RequestController requestController; 151 public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; 152 private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; 153 private final int periodToLog; 154 155 AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, 156 RpcControllerFactory rpcFactory) { 157 this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber()); 158 } 159 160 AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, 161 RpcControllerFactory rpcFactory, int retriesNumber) { 162 if (hc == null) { 163 throw new IllegalArgumentException("ClusterConnection cannot be null."); 164 } 165 166 this.connection = hc; 167 this.connectionConfiguration = connection.getConnectionConfiguration(); 168 169 this.id = COUNTER.incrementAndGet(); 170 171 // how many times we could try in total, one more than retry number 172 this.numTries = retriesNumber + 1; 173 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); 174 this.startLogErrorsCnt = 175 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 176 this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD); 177 // Server tracker allows us to do faster, and yet useful (hopefully), retries. 178 // However, if we are too useful, we might fail very quickly due to retry count limit. 179 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum 180 // retry time if normal retries were used. Then we will retry until this time runs out. 181 // If we keep hitting one server, the net effect will be the incremental backoff, and 182 // essentially the same number of retries as planned. If we have to do faster retries, 183 // we will do more retries in aggregate, but the user will be none the wiser. 184 this.serverTrackerTimeout = 0L; 185 for (int i = 0; i < this.numTries; ++i) { 186 serverTrackerTimeout = serverTrackerTimeout 187 + ConnectionUtils.getPauseTime(connectionConfiguration.getPauseMillis(), i); 188 } 189 190 this.rpcCallerFactory = rpcCaller; 191 this.rpcFactory = rpcFactory; 192 this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); 193 194 this.requestController = RequestControllerFactory.create(conf); 195 } 196 197 /** 198 * The submitted task may be not accomplished at all if there are too many running tasks or other 199 * limits. 200 * @param <CResult> The class to cast the result 201 * @param task The setting and data n 202 */ 203 public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) 204 throws InterruptedIOException { 205 AsyncRequestFuture reqFuture = checkTask(task); 206 if (reqFuture != null) { 207 return reqFuture; 208 } 209 SubmittedRows submittedRows = 210 task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows(); 211 switch (submittedRows) { 212 case ALL: 213 return submitAll(task); 214 case AT_LEAST_ONE: 215 return submit(task, true); 216 default: 217 return submit(task, false); 218 } 219 } 220 221 /** 222 * Extract from the rows list what we can submit. The rows we can not submit are kept in the list. 223 * Does not send requests to replicas (not currently used for anything other than streaming puts 224 * anyway). 225 * @param task The setting and data 226 * @param atLeastOne true if we should submit at least a subset. 227 */ 228 private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, boolean atLeastOne) 229 throws InterruptedIOException { 230 TableName tableName = task.getTableName(); 231 RowAccess<? extends Row> rows = task.getRowAccess(); 232 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 233 List<Action> retainedActions = new ArrayList<>(rows.size()); 234 235 NonceGenerator ng = this.connection.getNonceGenerator(); 236 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. 237 238 // Location errors that happen before we decide what requests to take. 239 List<Exception> locationErrors = null; 240 List<Integer> locationErrorRows = null; 241 RequestController.Checker checker = requestController.newChecker(); 242 boolean firstIter = true; 243 do { 244 // Wait until there is at least one slot for a new task. 245 requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); 246 int posInList = -1; 247 if (!firstIter) { 248 checker.reset(); 249 } 250 Iterator<? extends Row> it = rows.iterator(); 251 while (it.hasNext()) { 252 Row r = it.next(); 253 HRegionLocation loc; 254 try { 255 if (r == null) { 256 throw new IllegalArgumentException("#" + id + ", row cannot be null"); 257 } 258 // Make sure we get 0-s replica. 259 RegionLocations locs = connection.locateRegion(tableName, r.getRow(), true, true, 260 RegionReplicaUtil.DEFAULT_REPLICA_ID); 261 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { 262 throw new IOException("#" + id + ", no location found, aborting submit for" 263 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); 264 } 265 loc = locs.getDefaultRegionLocation(); 266 } catch (IOException ex) { 267 locationErrors = new ArrayList<>(1); 268 locationErrorRows = new ArrayList<>(1); 269 LOG.error("Failed to get region location ", ex); 270 // This action failed before creating ars. Retain it, but do not add to submit list. 271 // We will then add it to ars in an already-failed state. 272 273 int priority = HConstants.NORMAL_QOS; 274 if (r instanceof Mutation) { 275 priority = ((Mutation) r).getPriority(); 276 } 277 retainedActions.add(new Action(r, ++posInList, priority)); 278 locationErrors.add(ex); 279 locationErrorRows.add(posInList); 280 it.remove(); 281 break; // Backward compat: we stop considering actions on location error. 282 } 283 ReturnCode code = checker.canTakeRow(loc, r); 284 if (code == ReturnCode.END) { 285 break; 286 } 287 if (code == ReturnCode.INCLUDE) { 288 int priority = HConstants.NORMAL_QOS; 289 if (r instanceof Mutation) { 290 priority = ((Mutation) r).getPriority(); 291 } 292 Action action = new Action(r, ++posInList, priority); 293 setNonce(ng, r, action); 294 retainedActions.add(action); 295 // TODO: replica-get is not supported on this path 296 byte[] regionName = loc.getRegionInfo().getRegionName(); 297 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 298 it.remove(); 299 } 300 } 301 firstIter = false; 302 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); 303 304 if (retainedActions.isEmpty()) return NO_REQS_RESULT; 305 306 return submitMultiActions(task, retainedActions, nonceGroup, locationErrors, locationErrorRows, 307 actionsByServer); 308 } 309 310 <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task, 311 List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors, 312 List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) { 313 AsyncRequestFutureImpl<CResult> ars = 314 createAsyncRequestFuture(task, retainedActions, nonceGroup); 315 // Add location errors if any 316 if (locationErrors != null) { 317 for (int i = 0; i < locationErrors.size(); ++i) { 318 int originalIndex = locationErrorRows.get(i); 319 Row row = retainedActions.get(originalIndex).getAction(); 320 ars.manageError(originalIndex, row, AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, 321 locationErrors.get(i), null); 322 } 323 } 324 ars.sendMultiAction(actionsByServer, 1, null, false); 325 return ars; 326 } 327 328 /** 329 * Helper that is used when grouping the actions per region server. 330 * @param server - server 331 * @param regionName - regionName 332 * @param action - the action to add to the multiaction 333 * @param actionsByServer the multiaction per server 334 * @param nonceGroup Nonce group. 335 */ 336 static void addAction(ServerName server, byte[] regionName, Action action, 337 Map<ServerName, MultiAction> actionsByServer, long nonceGroup) { 338 MultiAction multiAction = actionsByServer.get(server); 339 if (multiAction == null) { 340 multiAction = new MultiAction(); 341 actionsByServer.put(server, multiAction); 342 } 343 if (action.hasNonce() && !multiAction.hasNonceGroup()) { 344 multiAction.setNonceGroup(nonceGroup); 345 } 346 347 multiAction.add(regionName, action); 348 } 349 350 /** 351 * Submit immediately the list of rows, whatever the server status. Kept for backward 352 * compatibility: it allows to be used with the batch interface that return an array of objects. 353 * @param task The setting and data 354 */ 355 private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) { 356 RowAccess<? extends Row> rows = task.getRowAccess(); 357 List<Action> actions = new ArrayList<>(rows.size()); 358 359 // The position will be used by the processBatch to match the object array returned. 360 int posInList = -1; 361 NonceGenerator ng = this.connection.getNonceGenerator(); 362 int highestPriority = HConstants.PRIORITY_UNSET; 363 for (Row r : rows) { 364 posInList++; 365 if (r instanceof Put) { 366 Put put = (Put) r; 367 if (put.isEmpty()) { 368 throw new IllegalArgumentException( 369 "No columns to insert for #" + (posInList + 1) + " item"); 370 } 371 highestPriority = Math.max(put.getPriority(), highestPriority); 372 } 373 Action action = new Action(r, posInList, highestPriority); 374 setNonce(ng, r, action); 375 actions.add(action); 376 } 377 AsyncRequestFutureImpl<CResult> ars = 378 createAsyncRequestFuture(task, actions, ng.getNonceGroup()); 379 ars.groupAndSendMultiAction(actions, 1); 380 return ars; 381 } 382 383 private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) { 384 if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) { 385 return NO_REQS_RESULT; 386 } 387 Objects.requireNonNull(task.getPool(), "The pool can't be NULL"); 388 checkOperationTimeout(task.getOperationTimeout()); 389 checkRpcTimeout(task.getRpcTimeout()); 390 return null; 391 } 392 393 private void setNonce(NonceGenerator ng, Row r, Action action) { 394 if (hasIncrementOrAppend(r)) { 395 action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. 396 } 397 } 398 399 private static boolean hasIncrementOrAppend(Row action) { 400 if (action instanceof Append || action instanceof Increment) { 401 return true; 402 } else if (action instanceof RowMutations) { 403 return hasIncrementOrAppend((RowMutations) action); 404 } else if (action instanceof CheckAndMutate) { 405 return hasIncrementOrAppend(((CheckAndMutate) action).getAction()); 406 } 407 return false; 408 } 409 410 private static boolean hasIncrementOrAppend(RowMutations mutations) { 411 for (Mutation mutation : mutations.getMutations()) { 412 if (mutation instanceof Append || mutation instanceof Increment) { 413 return true; 414 } 415 } 416 return false; 417 } 418 419 private int checkTimeout(String name, int timeout) { 420 if (timeout < 0) { 421 throw new RuntimeException( 422 "The " + name + " must be bigger than zero," + "current value is" + timeout); 423 } 424 return timeout; 425 } 426 427 private int checkOperationTimeout(int operationTimeout) { 428 return checkTimeout("operation timeout", operationTimeout); 429 } 430 431 private int checkRpcTimeout(int rpcTimeout) { 432 return checkTimeout("rpc timeout", rpcTimeout); 433 } 434 435 <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(AsyncProcessTask task, 436 List<Action> actions, long nonceGroup) { 437 return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this); 438 } 439 440 /** Wait until the async does not have more than max tasks in progress. */ 441 protected void waitForMaximumCurrentTasks(int max, TableName tableName) 442 throws InterruptedIOException { 443 requestController.waitForMaximumCurrentTasks(max, id, periodToLog, getLogger(tableName, max)); 444 } 445 446 private Consumer<Long> getLogger(TableName tableName, long max) { 447 return (currentInProgress) -> { 448 LOG.info("#" + id 449 + (max < 0 450 ? ", waiting for any free slot" 451 : ", waiting for some tasks to finish. Expected max=" + max) 452 + ", tasksInProgress=" + currentInProgress 453 + (tableName == null ? "" : ", tableName=" + tableName)); 454 }; 455 } 456 457 void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 458 requestController.incTaskCounters(regions, sn); 459 } 460 461 void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 462 requestController.decTaskCounters(regions, sn); 463 } 464 465 /** 466 * Create a caller. Isolated to be easily overridden in the tests. 467 */ 468 protected RpcRetryingCaller<AbstractResponse> 469 createCaller(CancellableRegionServerCallable callable, int rpcTimeout) { 470 return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout)); 471 } 472 473 /** 474 * Creates the server error tracker to use inside process. Currently, to preserve the main 475 * assumption about current retries, and to work well with the retry-limit-based calculation, the 476 * calculation is local per Process object. We may benefit from connection-wide tracking of server 477 * errors. 478 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 479 */ 480 ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 481 return new ConnectionImplementation.ServerErrorTracker(this.serverTrackerTimeout, 482 this.numTries); 483 } 484 485 static boolean isReplicaGet(Row row) { 486 return (row instanceof Get) && (((Get) row).getConsistency() == Consistency.TIMELINE); 487 } 488 489}