001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022 023import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 024 025import java.io.IOException; 026import java.io.InterruptedIOException; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Objects; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.function.Consumer; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.RegionLocations; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.apache.yetus.audience.InterfaceStability; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 048import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 049import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 050import org.apache.hadoop.hbase.util.Bytes; 051 052/** 053 * This class allows a continuous flow of requests. It's written to be compatible with a 054 * synchronous caller such as HTable. 055 * <p> 056 * The caller sends a buffer of operation, by calling submit. This class extract from this list 057 * the operations it can send, i.e. the operations that are on region that are not considered 058 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to 059 * iterate on the list. If, and only if, the maximum number of current task is reached, the call 060 * to submit will block. Alternatively, the caller can call submitAll, in which case all the 061 * operations will be sent. Each call to submit returns a future-like object that can be used 062 * to track operation progress. 063 * </p> 064 * <p> 065 * The class manages internally the retries. 066 * </p> 067 * <p> 068 * The errors are tracked inside the Future object that is returned. 069 * The results are always tracked inside the Future object and can be retrieved when the call 070 * has finished. Partial results can also be retrieved if some part of multi-request failed. 071 * </p> 072 * <p> 073 * This class is thread safe. 074 * Internally, the class is thread safe enough to manage simultaneously new submission and results 075 * arising from older operations. 076 * </p> 077 * <p> 078 * Internally, this class works with {@link Row}, this mean it could be theoretically used for 079 * gets as well. 080 * </p> 081 */ 082@InterfaceAudience.Private 083@InterfaceStability.Evolving 084class AsyncProcess { 085 private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class); 086 private static final AtomicLong COUNTER = new AtomicLong(); 087 088 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; 089 090 /** 091 * Configure the number of failures after which the client will start logging. A few failures 092 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 093 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at 094 * this stage. 095 */ 096 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 097 "hbase.client.start.log.errors.counter"; 098 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 099 100 /** 101 * Configuration to decide whether to log details for batch error 102 */ 103 public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; 104 105 /** 106 * Return value from a submit that didn't contain any requests. 107 */ 108 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { 109 final Object[] result = new Object[0]; 110 111 @Override 112 public boolean hasError() { 113 return false; 114 } 115 116 @Override 117 public RetriesExhaustedWithDetailsException getErrors() { 118 return null; 119 } 120 121 @Override 122 public List<? extends Row> getFailedOperations() { 123 return null; 124 } 125 126 @Override 127 public Object[] getResults() { 128 return result; 129 } 130 131 @Override 132 public void waitUntilDone() throws InterruptedIOException { 133 } 134 }; 135 136 // TODO: many of the fields should be made private 137 final long id; 138 139 final ClusterConnection connection; 140 private final RpcRetryingCallerFactory rpcCallerFactory; 141 final RpcControllerFactory rpcFactory; 142 143 // Start configuration settings. 144 final int startLogErrorsCnt; 145 146 final long pause; 147 final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 148 final int numTries; 149 @VisibleForTesting 150 long serverTrackerTimeout; 151 final long primaryCallTimeoutMicroseconds; 152 /** Whether to log details for batch errors */ 153 final boolean logBatchErrorDetails; 154 // End configuration settings. 155 156 /** 157 * The traffic control for requests. 158 */ 159 @VisibleForTesting 160 final RequestController requestController; 161 public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; 162 private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; 163 private final int periodToLog; 164 AsyncProcess(ClusterConnection hc, Configuration conf, 165 RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { 166 if (hc == null) { 167 throw new IllegalArgumentException("ClusterConnection cannot be null."); 168 } 169 170 this.connection = hc; 171 172 this.id = COUNTER.incrementAndGet(); 173 174 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 175 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 176 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 177 if (configuredPauseForCQTBE < pause) { 178 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 179 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 180 + ", will use " + pause + " instead."); 181 this.pauseForCQTBE = pause; 182 } else { 183 this.pauseForCQTBE = configuredPauseForCQTBE; 184 } 185 // how many times we could try in total, one more than retry number 186 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 187 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 188 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); 189 this.startLogErrorsCnt = 190 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 191 this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD); 192 // Server tracker allows us to do faster, and yet useful (hopefully), retries. 193 // However, if we are too useful, we might fail very quickly due to retry count limit. 194 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum 195 // retry time if normal retries were used. Then we will retry until this time runs out. 196 // If we keep hitting one server, the net effect will be the incremental backoff, and 197 // essentially the same number of retries as planned. If we have to do faster retries, 198 // we will do more retries in aggregate, but the user will be none the wiser. 199 this.serverTrackerTimeout = 0L; 200 for (int i = 0; i < this.numTries; ++i) { 201 serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i); 202 } 203 204 this.rpcCallerFactory = rpcCaller; 205 this.rpcFactory = rpcFactory; 206 this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); 207 208 this.requestController = RequestControllerFactory.create(conf); 209 } 210 211 /** 212 * The submitted task may be not accomplished at all if there are too many running tasks or 213 * other limits. 214 * @param <CResult> The class to cast the result 215 * @param task The setting and data 216 * @return AsyncRequestFuture 217 */ 218 public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException { 219 AsyncRequestFuture reqFuture = checkTask(task); 220 if (reqFuture != null) { 221 return reqFuture; 222 } 223 SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows(); 224 switch (submittedRows) { 225 case ALL: 226 return submitAll(task); 227 case AT_LEAST_ONE: 228 return submit(task, true); 229 default: 230 return submit(task, false); 231 } 232 } 233 234 /** 235 * Extract from the rows list what we can submit. The rows we can not submit are kept in the 236 * list. Does not send requests to replicas (not currently used for anything other 237 * than streaming puts anyway). 238 * 239 * @param task The setting and data 240 * @param atLeastOne true if we should submit at least a subset. 241 */ 242 private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, 243 boolean atLeastOne) throws InterruptedIOException { 244 TableName tableName = task.getTableName(); 245 RowAccess<? extends Row> rows = task.getRowAccess(); 246 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 247 List<Action> retainedActions = new ArrayList<>(rows.size()); 248 249 NonceGenerator ng = this.connection.getNonceGenerator(); 250 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. 251 252 // Location errors that happen before we decide what requests to take. 253 List<Exception> locationErrors = null; 254 List<Integer> locationErrorRows = null; 255 RequestController.Checker checker = requestController.newChecker(); 256 boolean firstIter = true; 257 do { 258 // Wait until there is at least one slot for a new task. 259 requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); 260 int posInList = -1; 261 if (!firstIter) { 262 checker.reset(); 263 } 264 Iterator<? extends Row> it = rows.iterator(); 265 while (it.hasNext()) { 266 Row r = it.next(); 267 HRegionLocation loc; 268 try { 269 if (r == null) { 270 throw new IllegalArgumentException("#" + id + ", row cannot be null"); 271 } 272 // Make sure we get 0-s replica. 273 RegionLocations locs = connection.locateRegion( 274 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); 275 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { 276 throw new IOException("#" + id + ", no location found, aborting submit for" 277 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); 278 } 279 loc = locs.getDefaultRegionLocation(); 280 } catch (IOException ex) { 281 locationErrors = new ArrayList<>(1); 282 locationErrorRows = new ArrayList<>(1); 283 LOG.error("Failed to get region location ", ex); 284 // This action failed before creating ars. Retain it, but do not add to submit list. 285 // We will then add it to ars in an already-failed state. 286 287 int priority = HConstants.NORMAL_QOS; 288 if (r instanceof Mutation) { 289 priority = ((Mutation) r).getPriority(); 290 } 291 retainedActions.add(new Action(r, ++posInList, priority)); 292 locationErrors.add(ex); 293 locationErrorRows.add(posInList); 294 it.remove(); 295 break; // Backward compat: we stop considering actions on location error. 296 } 297 ReturnCode code = checker.canTakeRow(loc, r); 298 if (code == ReturnCode.END) { 299 break; 300 } 301 if (code == ReturnCode.INCLUDE) { 302 int priority = HConstants.NORMAL_QOS; 303 if (r instanceof Mutation) { 304 priority = ((Mutation) r).getPriority(); 305 } 306 Action action = new Action(r, ++posInList, priority); 307 setNonce(ng, r, action); 308 retainedActions.add(action); 309 // TODO: replica-get is not supported on this path 310 byte[] regionName = loc.getRegionInfo().getRegionName(); 311 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 312 it.remove(); 313 } 314 } 315 firstIter = false; 316 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); 317 318 if (retainedActions.isEmpty()) return NO_REQS_RESULT; 319 320 return submitMultiActions(task, retainedActions, nonceGroup, 321 locationErrors, locationErrorRows, actionsByServer); 322 } 323 324 <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task, 325 List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors, 326 List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) { 327 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup); 328 // Add location errors if any 329 if (locationErrors != null) { 330 for (int i = 0; i < locationErrors.size(); ++i) { 331 int originalIndex = locationErrorRows.get(i); 332 Row row = retainedActions.get(originalIndex).getAction(); 333 ars.manageError(originalIndex, row, 334 AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); 335 } 336 } 337 ars.sendMultiAction(actionsByServer, 1, null, false); 338 return ars; 339 } 340 341 /** 342 * Helper that is used when grouping the actions per region server. 343 * 344 * @param server - server 345 * @param regionName - regionName 346 * @param action - the action to add to the multiaction 347 * @param actionsByServer the multiaction per server 348 * @param nonceGroup Nonce group. 349 */ 350 static void addAction(ServerName server, byte[] regionName, Action action, 351 Map<ServerName, MultiAction> actionsByServer, long nonceGroup) { 352 MultiAction multiAction = actionsByServer.get(server); 353 if (multiAction == null) { 354 multiAction = new MultiAction(); 355 actionsByServer.put(server, multiAction); 356 } 357 if (action.hasNonce() && !multiAction.hasNonceGroup()) { 358 multiAction.setNonceGroup(nonceGroup); 359 } 360 361 multiAction.add(regionName, action); 362 } 363 364 /** 365 * Submit immediately the list of rows, whatever the server status. Kept for backward 366 * compatibility: it allows to be used with the batch interface that return an array of objects. 367 * @param task The setting and data 368 */ 369 private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) { 370 RowAccess<? extends Row> rows = task.getRowAccess(); 371 List<Action> actions = new ArrayList<>(rows.size()); 372 373 // The position will be used by the processBatch to match the object array returned. 374 int posInList = -1; 375 NonceGenerator ng = this.connection.getNonceGenerator(); 376 int highestPriority = HConstants.PRIORITY_UNSET; 377 for (Row r : rows) { 378 posInList++; 379 if (r instanceof Put) { 380 Put put = (Put) r; 381 if (put.isEmpty()) { 382 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); 383 } 384 highestPriority = Math.max(put.getPriority(), highestPriority); 385 } 386 Action action = new Action(r, posInList, highestPriority); 387 setNonce(ng, r, action); 388 actions.add(action); 389 } 390 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup()); 391 ars.groupAndSendMultiAction(actions, 1); 392 return ars; 393 } 394 395 private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) { 396 if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) { 397 return NO_REQS_RESULT; 398 } 399 Objects.requireNonNull(task.getPool(), "The pool can't be NULL"); 400 checkOperationTimeout(task.getOperationTimeout()); 401 checkRpcTimeout(task.getRpcTimeout()); 402 return null; 403 } 404 405 private void setNonce(NonceGenerator ng, Row r, Action action) { 406 if (!(r instanceof Append) && !(r instanceof Increment)) return; 407 action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. 408 } 409 410 private int checkTimeout(String name, int timeout) { 411 if (timeout < 0) { 412 throw new RuntimeException("The " + name + " must be bigger than zero," 413 + "current value is" + timeout); 414 } 415 return timeout; 416 } 417 private int checkOperationTimeout(int operationTimeout) { 418 return checkTimeout("operation timeout", operationTimeout); 419 } 420 421 private int checkRpcTimeout(int rpcTimeout) { 422 return checkTimeout("rpc timeout", rpcTimeout); 423 } 424 425 @VisibleForTesting 426 <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( 427 AsyncProcessTask task, List<Action> actions, long nonceGroup) { 428 return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this); 429 } 430 431 /** Wait until the async does not have more than max tasks in progress. */ 432 protected void waitForMaximumCurrentTasks(int max, TableName tableName) 433 throws InterruptedIOException { 434 requestController.waitForMaximumCurrentTasks(max, id, periodToLog, 435 getLogger(tableName, max)); 436 } 437 438 private Consumer<Long> getLogger(TableName tableName, long max) { 439 return (currentInProgress) -> { 440 LOG.info("#" + id + (max < 0 ? 441 ", waiting for any free slot" : 442 ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress=" 443 + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName)); 444 }; 445 } 446 447 void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 448 requestController.incTaskCounters(regions, sn); 449 } 450 451 452 void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 453 requestController.decTaskCounters(regions, sn); 454 } 455 456 /** 457 * Create a caller. Isolated to be easily overridden in the tests. 458 */ 459 @VisibleForTesting 460 protected RpcRetryingCaller<AbstractResponse> createCaller( 461 CancellableRegionServerCallable callable, int rpcTimeout) { 462 return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout)); 463 } 464 465 466 /** 467 * Creates the server error tracker to use inside process. 468 * Currently, to preserve the main assumption about current retries, and to work well with 469 * the retry-limit-based calculation, the calculation is local per Process object. 470 * We may benefit from connection-wide tracking of server errors. 471 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 472 */ 473 ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 474 return new ConnectionImplementation.ServerErrorTracker( 475 this.serverTrackerTimeout, this.numTries); 476 } 477 478 static boolean isReplicaGet(Row row) { 479 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); 480 } 481 482}