001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Objects; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.function.Consumer; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; 041import org.apache.hadoop.hbase.client.RequestController.ReturnCode; 042import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.yetus.audience.InterfaceStability; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * This class allows a continuous flow of requests. It's written to be compatible with a 051 * synchronous caller such as HTable. 052 * <p> 053 * The caller sends a buffer of operation, by calling submit. This class extract from this list 054 * the operations it can send, i.e. the operations that are on region that are not considered 055 * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to 056 * iterate on the list. If, and only if, the maximum number of current task is reached, the call 057 * to submit will block. Alternatively, the caller can call submitAll, in which case all the 058 * operations will be sent. Each call to submit returns a future-like object that can be used 059 * to track operation progress. 060 * </p> 061 * <p> 062 * The class manages internally the retries. 063 * </p> 064 * <p> 065 * The errors are tracked inside the Future object that is returned. 066 * The results are always tracked inside the Future object and can be retrieved when the call 067 * has finished. Partial results can also be retrieved if some part of multi-request failed. 068 * </p> 069 * <p> 070 * This class is thread safe. 071 * Internally, the class is thread safe enough to manage simultaneously new submission and results 072 * arising from older operations. 073 * </p> 074 * <p> 075 * Internally, this class works with {@link Row}, this mean it could be theoretically used for 076 * gets as well. 077 * </p> 078 */ 079@InterfaceAudience.Private 080@InterfaceStability.Evolving 081class AsyncProcess { 082 private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class); 083 private static final AtomicLong COUNTER = new AtomicLong(); 084 085 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; 086 087 /** 088 * Configure the number of failures after which the client will start logging. A few failures 089 * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable 090 * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at 091 * this stage. 092 */ 093 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = 094 "hbase.client.start.log.errors.counter"; 095 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; 096 097 /** 098 * Configuration to decide whether to log details for batch error 099 */ 100 public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; 101 102 /** 103 * Return value from a submit that didn't contain any requests. 104 */ 105 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() { 106 final Object[] result = new Object[0]; 107 108 @Override 109 public boolean hasError() { 110 return false; 111 } 112 113 @Override 114 public RetriesExhaustedWithDetailsException getErrors() { 115 return null; 116 } 117 118 @Override 119 public List<? extends Row> getFailedOperations() { 120 return null; 121 } 122 123 @Override 124 public Object[] getResults() { 125 return result; 126 } 127 128 @Override 129 public void waitUntilDone() throws InterruptedIOException { 130 } 131 }; 132 133 // TODO: many of the fields should be made private 134 final long id; 135 136 final ClusterConnection connection; 137 private final RpcRetryingCallerFactory rpcCallerFactory; 138 final RpcControllerFactory rpcFactory; 139 140 // Start configuration settings. 141 final int startLogErrorsCnt; 142 143 final long pause; 144 final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 145 final int numTries; 146 long serverTrackerTimeout; 147 final long primaryCallTimeoutMicroseconds; 148 /** Whether to log details for batch errors */ 149 final boolean logBatchErrorDetails; 150 // End configuration settings. 151 152 /** 153 * The traffic control for requests. 154 */ 155 final RequestController requestController; 156 public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; 157 private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; 158 private final int periodToLog; 159 AsyncProcess(ClusterConnection hc, Configuration conf, 160 RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { 161 if (hc == null) { 162 throw new IllegalArgumentException("ClusterConnection cannot be null."); 163 } 164 165 this.connection = hc; 166 167 this.id = COUNTER.incrementAndGet(); 168 169 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 170 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 171 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 172 if (configuredPauseForCQTBE < pause) { 173 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 174 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 175 + ", will use " + pause + " instead."); 176 this.pauseForCQTBE = pause; 177 } else { 178 this.pauseForCQTBE = configuredPauseForCQTBE; 179 } 180 // how many times we could try in total, one more than retry number 181 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 182 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 183 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); 184 this.startLogErrorsCnt = 185 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); 186 this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD); 187 // Server tracker allows us to do faster, and yet useful (hopefully), retries. 188 // However, if we are too useful, we might fail very quickly due to retry count limit. 189 // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum 190 // retry time if normal retries were used. Then we will retry until this time runs out. 191 // If we keep hitting one server, the net effect will be the incremental backoff, and 192 // essentially the same number of retries as planned. If we have to do faster retries, 193 // we will do more retries in aggregate, but the user will be none the wiser. 194 this.serverTrackerTimeout = 0L; 195 for (int i = 0; i < this.numTries; ++i) { 196 serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i); 197 } 198 199 this.rpcCallerFactory = rpcCaller; 200 this.rpcFactory = rpcFactory; 201 this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); 202 203 this.requestController = RequestControllerFactory.create(conf); 204 } 205 206 /** 207 * The submitted task may be not accomplished at all if there are too many running tasks or 208 * other limits. 209 * @param <CResult> The class to cast the result 210 * @param task The setting and data 211 * @return AsyncRequestFuture 212 */ 213 public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException { 214 AsyncRequestFuture reqFuture = checkTask(task); 215 if (reqFuture != null) { 216 return reqFuture; 217 } 218 SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows(); 219 switch (submittedRows) { 220 case ALL: 221 return submitAll(task); 222 case AT_LEAST_ONE: 223 return submit(task, true); 224 default: 225 return submit(task, false); 226 } 227 } 228 229 /** 230 * Extract from the rows list what we can submit. The rows we can not submit are kept in the 231 * list. Does not send requests to replicas (not currently used for anything other 232 * than streaming puts anyway). 233 * 234 * @param task The setting and data 235 * @param atLeastOne true if we should submit at least a subset. 236 */ 237 private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, 238 boolean atLeastOne) throws InterruptedIOException { 239 TableName tableName = task.getTableName(); 240 RowAccess<? extends Row> rows = task.getRowAccess(); 241 Map<ServerName, MultiAction> actionsByServer = new HashMap<>(); 242 List<Action> retainedActions = new ArrayList<>(rows.size()); 243 244 NonceGenerator ng = this.connection.getNonceGenerator(); 245 long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. 246 247 // Location errors that happen before we decide what requests to take. 248 List<Exception> locationErrors = null; 249 List<Integer> locationErrorRows = null; 250 RequestController.Checker checker = requestController.newChecker(); 251 boolean firstIter = true; 252 do { 253 // Wait until there is at least one slot for a new task. 254 requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); 255 int posInList = -1; 256 if (!firstIter) { 257 checker.reset(); 258 } 259 Iterator<? extends Row> it = rows.iterator(); 260 while (it.hasNext()) { 261 Row r = it.next(); 262 HRegionLocation loc; 263 try { 264 if (r == null) { 265 throw new IllegalArgumentException("#" + id + ", row cannot be null"); 266 } 267 // Make sure we get 0-s replica. 268 RegionLocations locs = connection.locateRegion( 269 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); 270 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) { 271 throw new IOException("#" + id + ", no location found, aborting submit for" 272 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow())); 273 } 274 loc = locs.getDefaultRegionLocation(); 275 } catch (IOException ex) { 276 locationErrors = new ArrayList<>(1); 277 locationErrorRows = new ArrayList<>(1); 278 LOG.error("Failed to get region location ", ex); 279 // This action failed before creating ars. Retain it, but do not add to submit list. 280 // We will then add it to ars in an already-failed state. 281 282 int priority = HConstants.NORMAL_QOS; 283 if (r instanceof Mutation) { 284 priority = ((Mutation) r).getPriority(); 285 } 286 retainedActions.add(new Action(r, ++posInList, priority)); 287 locationErrors.add(ex); 288 locationErrorRows.add(posInList); 289 it.remove(); 290 break; // Backward compat: we stop considering actions on location error. 291 } 292 ReturnCode code = checker.canTakeRow(loc, r); 293 if (code == ReturnCode.END) { 294 break; 295 } 296 if (code == ReturnCode.INCLUDE) { 297 int priority = HConstants.NORMAL_QOS; 298 if (r instanceof Mutation) { 299 priority = ((Mutation) r).getPriority(); 300 } 301 Action action = new Action(r, ++posInList, priority); 302 setNonce(ng, r, action); 303 retainedActions.add(action); 304 // TODO: replica-get is not supported on this path 305 byte[] regionName = loc.getRegionInfo().getRegionName(); 306 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); 307 it.remove(); 308 } 309 } 310 firstIter = false; 311 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); 312 313 if (retainedActions.isEmpty()) return NO_REQS_RESULT; 314 315 return submitMultiActions(task, retainedActions, nonceGroup, 316 locationErrors, locationErrorRows, actionsByServer); 317 } 318 319 <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task, 320 List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors, 321 List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) { 322 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup); 323 // Add location errors if any 324 if (locationErrors != null) { 325 for (int i = 0; i < locationErrors.size(); ++i) { 326 int originalIndex = locationErrorRows.get(i); 327 Row row = retainedActions.get(originalIndex).getAction(); 328 ars.manageError(originalIndex, row, 329 AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null); 330 } 331 } 332 ars.sendMultiAction(actionsByServer, 1, null, false); 333 return ars; 334 } 335 336 /** 337 * Helper that is used when grouping the actions per region server. 338 * 339 * @param server - server 340 * @param regionName - regionName 341 * @param action - the action to add to the multiaction 342 * @param actionsByServer the multiaction per server 343 * @param nonceGroup Nonce group. 344 */ 345 static void addAction(ServerName server, byte[] regionName, Action action, 346 Map<ServerName, MultiAction> actionsByServer, long nonceGroup) { 347 MultiAction multiAction = actionsByServer.get(server); 348 if (multiAction == null) { 349 multiAction = new MultiAction(); 350 actionsByServer.put(server, multiAction); 351 } 352 if (action.hasNonce() && !multiAction.hasNonceGroup()) { 353 multiAction.setNonceGroup(nonceGroup); 354 } 355 356 multiAction.add(regionName, action); 357 } 358 359 /** 360 * Submit immediately the list of rows, whatever the server status. Kept for backward 361 * compatibility: it allows to be used with the batch interface that return an array of objects. 362 * @param task The setting and data 363 */ 364 private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) { 365 RowAccess<? extends Row> rows = task.getRowAccess(); 366 List<Action> actions = new ArrayList<>(rows.size()); 367 368 // The position will be used by the processBatch to match the object array returned. 369 int posInList = -1; 370 NonceGenerator ng = this.connection.getNonceGenerator(); 371 int highestPriority = HConstants.PRIORITY_UNSET; 372 for (Row r : rows) { 373 posInList++; 374 if (r instanceof Put) { 375 Put put = (Put) r; 376 if (put.isEmpty()) { 377 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); 378 } 379 highestPriority = Math.max(put.getPriority(), highestPriority); 380 } 381 Action action = new Action(r, posInList, highestPriority); 382 setNonce(ng, r, action); 383 actions.add(action); 384 } 385 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup()); 386 ars.groupAndSendMultiAction(actions, 1); 387 return ars; 388 } 389 390 private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) { 391 if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) { 392 return NO_REQS_RESULT; 393 } 394 Objects.requireNonNull(task.getPool(), "The pool can't be NULL"); 395 checkOperationTimeout(task.getOperationTimeout()); 396 checkRpcTimeout(task.getRpcTimeout()); 397 return null; 398 } 399 400 private void setNonce(NonceGenerator ng, Row r, Action action) { 401 if (!(r instanceof Append) && !(r instanceof Increment)) return; 402 action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. 403 } 404 405 private int checkTimeout(String name, int timeout) { 406 if (timeout < 0) { 407 throw new RuntimeException("The " + name + " must be bigger than zero," 408 + "current value is" + timeout); 409 } 410 return timeout; 411 } 412 private int checkOperationTimeout(int operationTimeout) { 413 return checkTimeout("operation timeout", operationTimeout); 414 } 415 416 private int checkRpcTimeout(int rpcTimeout) { 417 return checkTimeout("rpc timeout", rpcTimeout); 418 } 419 420 <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( 421 AsyncProcessTask task, List<Action> actions, long nonceGroup) { 422 return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this); 423 } 424 425 /** Wait until the async does not have more than max tasks in progress. */ 426 protected void waitForMaximumCurrentTasks(int max, TableName tableName) 427 throws InterruptedIOException { 428 requestController.waitForMaximumCurrentTasks(max, id, periodToLog, 429 getLogger(tableName, max)); 430 } 431 432 private Consumer<Long> getLogger(TableName tableName, long max) { 433 return (currentInProgress) -> { 434 LOG.info("#" + id + (max < 0 ? 435 ", waiting for any free slot" : 436 ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress=" 437 + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName)); 438 }; 439 } 440 441 void incTaskCounters(Collection<byte[]> regions, ServerName sn) { 442 requestController.incTaskCounters(regions, sn); 443 } 444 445 446 void decTaskCounters(Collection<byte[]> regions, ServerName sn) { 447 requestController.decTaskCounters(regions, sn); 448 } 449 450 /** 451 * Create a caller. Isolated to be easily overridden in the tests. 452 */ 453 protected RpcRetryingCaller<AbstractResponse> createCaller( 454 CancellableRegionServerCallable callable, int rpcTimeout) { 455 return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout)); 456 } 457 458 /** 459 * Creates the server error tracker to use inside process. 460 * Currently, to preserve the main assumption about current retries, and to work well with 461 * the retry-limit-based calculation, the calculation is local per Process object. 462 * We may benefit from connection-wide tracking of server errors. 463 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection 464 */ 465 ConnectionImplementation.ServerErrorTracker createServerErrorTracker() { 466 return new ConnectionImplementation.ServerErrorTracker( 467 this.serverTrackerTimeout, this.numTries); 468 } 469 470 static boolean isReplicaGet(Row row) { 471 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE); 472 } 473 474}