001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.AbstractMap.SimpleEntry; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 047 048/** 049 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put 050 * will be sharded into different buffer queues based on its destination region server. So each 051 * region server buffer queue will only have the puts which share the same destination. And each 052 * queue will have a flush worker thread to flush the puts request to the region server. If any 053 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue. 054 * </p> 055 * Also all the puts will be retried as a configuration number before dropping. And the 056 * HTableMultiplexer can report the number of buffered requests and the number of the failed 057 * (dropped) requests in total or on per region server basis. 058 * <p/> 059 * This class is thread safe. 060 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 061 * {@link BufferedMutator} for batching mutations. 062 */ 063@Deprecated 064@InterfaceAudience.Public 065public class HTableMultiplexer { 066 private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); 067 068 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = 069 "hbase.tablemultiplexer.flush.period.ms"; 070 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; 071 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = 072 "hbase.client.max.retries.in.queue"; 073 074 /** The map between each region server to its flush worker */ 075 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = 076 new ConcurrentHashMap<>(); 077 078 private final Configuration conf; 079 private final ClusterConnection conn; 080 private final ExecutorService pool; 081 private final int maxAttempts; 082 private final int perRegionServerBufferQueueSize; 083 private final int maxKeyValueSize; 084 private final ScheduledExecutorService executor; 085 private final long flushPeriod; 086 087 /** 088 * @param conf The HBaseConfiguration 089 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 090 * each region server before dropping the request. 091 */ 092 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) 093 throws IOException { 094 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); 095 } 096 097 /** 098 * @param conn The HBase connection. 099 * @param conf The HBase configuration 100 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 101 * each region server before dropping the request. 102 */ 103 public HTableMultiplexer(Connection conn, Configuration conf, 104 int perRegionServerBufferQueueSize) { 105 this.conn = (ClusterConnection) conn; 106 this.pool = HTable.getDefaultExecutor(conf); 107 // how many times we could try in total, one more than retry number 108 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 109 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 110 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; 111 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); 112 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); 113 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); 114 this.executor = Executors.newScheduledThreadPool(initThreads, 115 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); 116 this.conf = conf; 117 } 118 119 /** 120 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already been 121 * closed. 122 * @throws IOException If there is an error closing the connection. 123 */ 124 public synchronized void close() throws IOException { 125 if (!getConnection().isClosed()) { 126 getConnection().close(); 127 } 128 } 129 130 /** 131 * The put request will be buffered by its corresponding buffer queue. Return false if the queue 132 * is already full. nn * @return true if the request can be accepted by its corresponding buffer 133 * queue. 134 */ 135 public boolean put(TableName tableName, final Put put) { 136 return put(tableName, put, this.maxAttempts); 137 } 138 139 /** 140 * The puts request will be buffered by their corresponding buffer queue. Return the list of puts 141 * which could not be queued. nn * @return the list of puts which could not be queued 142 */ 143 public List<Put> put(TableName tableName, final List<Put> puts) { 144 if (puts == null) return null; 145 146 List<Put> failedPuts = null; 147 boolean result; 148 for (Put put : puts) { 149 result = put(tableName, put, this.maxAttempts); 150 if (result == false) { 151 152 // Create the failed puts list if necessary 153 if (failedPuts == null) { 154 failedPuts = new ArrayList<>(); 155 } 156 // Add the put to the failed puts list 157 failedPuts.add(put); 158 } 159 } 160 return failedPuts; 161 } 162 163 /** 164 * @deprecated Use {@link #put(TableName, List) } instead. 165 */ 166 @Deprecated 167 public List<Put> put(byte[] tableName, final List<Put> puts) { 168 return put(TableName.valueOf(tableName), puts); 169 } 170 171 /** 172 * The put request will be buffered by its corresponding buffer queue. And the put request will be 173 * retried before dropping the request. Return false if the queue is already full. 174 * @return true if the request can be accepted by its corresponding buffer queue. 175 */ 176 public boolean put(final TableName tableName, final Put put, int maxAttempts) { 177 if (maxAttempts <= 0) { 178 return false; 179 } 180 181 try { 182 ConnectionUtils.validatePut(put, maxKeyValueSize); 183 // Allow mocking to get at the connection, but don't expose the connection to users. 184 ClusterConnection conn = (ClusterConnection) getConnection(); 185 // AsyncProcess in the FlushWorker should take care of refreshing the location cache 186 // as necessary. We shouldn't have to do that here. 187 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); 188 if (loc != null) { 189 // Add the put pair into its corresponding queue. 190 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); 191 192 // Generate a MultiPutStatus object and offer it into the queue 193 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); 194 195 return queue.offer(s); 196 } 197 } catch (IOException e) { 198 LOG.debug("Cannot process the put " + put, e); 199 } 200 return false; 201 } 202 203 /** 204 * @deprecated Use {@link #put(TableName, Put) } instead. 205 */ 206 @Deprecated 207 public boolean put(final byte[] tableName, final Put put, int retry) { 208 return put(TableName.valueOf(tableName), put, retry); 209 } 210 211 /** 212 * @deprecated Use {@link #put(TableName, Put)} instead. 213 */ 214 @Deprecated 215 public boolean put(final byte[] tableName, Put put) { 216 return put(TableName.valueOf(tableName), put); 217 } 218 219 /** Returns the current HTableMultiplexerStatus */ 220 public HTableMultiplexerStatus getHTableMultiplexerStatus() { 221 return new HTableMultiplexerStatus(serverToFlushWorkerMap); 222 } 223 224 @InterfaceAudience.Private 225 @SuppressWarnings("FutureReturnValueIgnored") 226 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { 227 FlushWorker worker = serverToFlushWorkerMap.get(addr); 228 if (worker == null) { 229 synchronized (this.serverToFlushWorkerMap) { 230 worker = serverToFlushWorkerMap.get(addr); 231 if (worker == null) { 232 // Create the flush worker 233 worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize, 234 pool, executor); 235 this.serverToFlushWorkerMap.put(addr, worker); 236 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); 237 } 238 } 239 } 240 return worker.getQueue(); 241 } 242 243 @InterfaceAudience.Private 244 ClusterConnection getConnection() { 245 return this.conn; 246 } 247 248 /** 249 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the 250 * number of buffered requests and the number of the failed (dropped) requests in total or on per 251 * region server basis. 252 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 253 * {@link BufferedMutator} for batching mutations. 254 */ 255 @Deprecated 256 @InterfaceAudience.Public 257 public static class HTableMultiplexerStatus { 258 private long totalFailedPutCounter; 259 private long totalBufferedPutCounter; 260 private long maxLatency; 261 private long overallAverageLatency; 262 private Map<String, Long> serverToFailedCounterMap; 263 private Map<String, Long> serverToBufferedCounterMap; 264 private Map<String, Long> serverToAverageLatencyMap; 265 private Map<String, Long> serverToMaxLatencyMap; 266 267 public HTableMultiplexerStatus(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 268 this.totalBufferedPutCounter = 0; 269 this.totalFailedPutCounter = 0; 270 this.maxLatency = 0; 271 this.overallAverageLatency = 0; 272 this.serverToBufferedCounterMap = new HashMap<>(); 273 this.serverToFailedCounterMap = new HashMap<>(); 274 this.serverToAverageLatencyMap = new HashMap<>(); 275 this.serverToMaxLatencyMap = new HashMap<>(); 276 this.initialize(serverToFlushWorkerMap); 277 } 278 279 private void initialize(Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 280 if (serverToFlushWorkerMap == null) { 281 return; 282 } 283 284 long averageCalcSum = 0; 285 int averageCalcCount = 0; 286 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap.entrySet()) { 287 HRegionLocation addr = entry.getKey(); 288 FlushWorker worker = entry.getValue(); 289 290 long bufferedCounter = worker.getTotalBufferedCount(); 291 long failedCounter = worker.getTotalFailedCount(); 292 long serverMaxLatency = worker.getMaxLatency(); 293 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); 294 // Get sum and count pieces separately to compute overall average 295 SimpleEntry<Long, Integer> averageComponents = averageCounter.getComponents(); 296 long serverAvgLatency = averageCounter.getAndReset(); 297 298 this.totalBufferedPutCounter += bufferedCounter; 299 this.totalFailedPutCounter += failedCounter; 300 if (serverMaxLatency > this.maxLatency) { 301 this.maxLatency = serverMaxLatency; 302 } 303 averageCalcSum += averageComponents.getKey(); 304 averageCalcCount += averageComponents.getValue(); 305 306 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), bufferedCounter); 307 this.serverToFailedCounterMap.put(addr.getHostnamePort(), failedCounter); 308 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), serverAvgLatency); 309 this.serverToMaxLatencyMap.put(addr.getHostnamePort(), serverMaxLatency); 310 } 311 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum / averageCalcCount : 0; 312 } 313 314 public long getTotalBufferedCounter() { 315 return this.totalBufferedPutCounter; 316 } 317 318 public long getTotalFailedCounter() { 319 return this.totalFailedPutCounter; 320 } 321 322 public long getMaxLatency() { 323 return this.maxLatency; 324 } 325 326 public long getOverallAverageLatency() { 327 return this.overallAverageLatency; 328 } 329 330 public Map<String, Long> getBufferedCounterForEachRegionServer() { 331 return this.serverToBufferedCounterMap; 332 } 333 334 public Map<String, Long> getFailedCounterForEachRegionServer() { 335 return this.serverToFailedCounterMap; 336 } 337 338 public Map<String, Long> getMaxLatencyForEachRegionServer() { 339 return this.serverToMaxLatencyMap; 340 } 341 342 public Map<String, Long> getAverageLatencyForEachRegionServer() { 343 return this.serverToAverageLatencyMap; 344 } 345 } 346 347 @InterfaceAudience.Private 348 static class PutStatus { 349 final RegionInfo regionInfo; 350 final Put put; 351 final int maxAttempCount; 352 353 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { 354 this.regionInfo = regionInfo; 355 this.put = put; 356 this.maxAttempCount = maxAttempCount; 357 } 358 } 359 360 /** 361 * Helper to count the average over an interval until reset. 362 */ 363 private static class AtomicAverageCounter { 364 private long sum; 365 private int count; 366 367 public AtomicAverageCounter() { 368 this.sum = 0L; 369 this.count = 0; 370 } 371 372 public synchronized long getAndReset() { 373 long result = this.get(); 374 this.reset(); 375 return result; 376 } 377 378 public synchronized long get() { 379 if (this.count == 0) { 380 return 0; 381 } 382 return this.sum / this.count; 383 } 384 385 public synchronized SimpleEntry<Long, Integer> getComponents() { 386 return new SimpleEntry<>(sum, count); 387 } 388 389 public synchronized void reset() { 390 this.sum = 0L; 391 this.count = 0; 392 } 393 394 public synchronized void add(long value) { 395 this.sum += value; 396 this.count++; 397 } 398 } 399 400 @InterfaceAudience.Private 401 static class FlushWorker implements Runnable { 402 private final HRegionLocation addr; 403 private final LinkedBlockingQueue<PutStatus> queue; 404 private final HTableMultiplexer multiplexer; 405 private final AtomicLong totalFailedPutCount = new AtomicLong(0); 406 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); 407 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); 408 private final AtomicLong maxLatency = new AtomicLong(0); 409 410 private final AsyncProcess ap; 411 private final List<PutStatus> processingList = new ArrayList<>(); 412 private final ScheduledExecutorService executor; 413 private final int maxRetryInQueue; 414 private final AtomicInteger retryInQueue = new AtomicInteger(0); 415 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor 416 private final int operationTimeout; 417 private final ExecutorService pool; 418 419 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, 420 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, 421 ScheduledExecutorService executor) { 422 this.addr = addr; 423 this.multiplexer = htableMultiplexer; 424 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); 425 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); 426 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); 427 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 428 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 429 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 430 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 431 // Specify 0 retries in AsyncProcess because we need to reassign puts to different queues 432 // if regions are moved. 433 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0); 434 this.executor = executor; 435 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); 436 this.pool = pool; 437 } 438 439 protected LinkedBlockingQueue<PutStatus> getQueue() { 440 return this.queue; 441 } 442 443 public long getTotalFailedCount() { 444 return totalFailedPutCount.get(); 445 } 446 447 public long getTotalBufferedCount() { 448 return (long) queue.size() + currentProcessingCount.get(); 449 } 450 451 public AtomicAverageCounter getAverageLatencyCounter() { 452 return this.averageLatency; 453 } 454 455 public long getMaxLatency() { 456 return this.maxLatency.getAndSet(0); 457 } 458 459 @SuppressWarnings("FutureReturnValueIgnored") 460 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { 461 // Decrease the retry count 462 final int retryCount = ps.maxAttempCount - 1; 463 464 if (retryCount <= 0) { 465 // Update the failed counter and no retry any more. 466 return false; 467 } 468 469 int cnt = getRetryInQueue().incrementAndGet(); 470 if (cnt > getMaxRetryInQueue()) { 471 // Too many Puts in queue for resubmit, give up this 472 getRetryInQueue().decrementAndGet(); 473 return false; 474 } 475 476 final Put failedPut = ps.put; 477 // The currentPut is failed. So get the table name for the currentPut. 478 final TableName tableName = ps.regionInfo.getTable(); 479 480 long delayMs = getNextDelay(retryCount); 481 if (LOG.isDebugEnabled()) { 482 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); 483 } 484 485 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating 486 // the region location cache when the Put original failed with some exception. If we keep 487 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff 488 // that we expect it to. 489 getExecutor().schedule(new Runnable() { 490 @Override 491 public void run() { 492 boolean succ = false; 493 try { 494 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); 495 } finally { 496 FlushWorker.this.getRetryInQueue().decrementAndGet(); 497 if (!succ) { 498 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); 499 } 500 } 501 } 502 }, delayMs, TimeUnit.MILLISECONDS); 503 return true; 504 } 505 506 @InterfaceAudience.Private 507 long getNextDelay(int retryCount) { 508 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, 509 multiplexer.maxAttempts - retryCount - 1); 510 } 511 512 @InterfaceAudience.Private 513 AtomicInteger getRetryInQueue() { 514 return this.retryInQueue; 515 } 516 517 @InterfaceAudience.Private 518 int getMaxRetryInQueue() { 519 return this.maxRetryInQueue; 520 } 521 522 @InterfaceAudience.Private 523 AtomicLong getTotalFailedPutCount() { 524 return this.totalFailedPutCount; 525 } 526 527 @InterfaceAudience.Private 528 HTableMultiplexer getMultiplexer() { 529 return this.multiplexer; 530 } 531 532 @InterfaceAudience.Private 533 ScheduledExecutorService getExecutor() { 534 return this.executor; 535 } 536 537 @Override 538 public void run() { 539 int failedCount = 0; 540 try { 541 long start = EnvironmentEdgeManager.currentTime(); 542 543 // drain all the queued puts into the tmp list 544 processingList.clear(); 545 queue.drainTo(processingList); 546 if (processingList.isEmpty()) { 547 // Nothing to flush 548 return; 549 } 550 551 currentProcessingCount.set(processingList.size()); 552 // failedCount is decreased whenever a Put is success or resubmit. 553 failedCount = processingList.size(); 554 555 List<Action> retainedActions = new ArrayList<>(processingList.size()); 556 MultiAction actions = new MultiAction(); 557 for (int i = 0; i < processingList.size(); i++) { 558 PutStatus putStatus = processingList.get(i); 559 Action action = new Action(putStatus.put, i); 560 actions.add(putStatus.regionInfo.getRegionName(), action); 561 retainedActions.add(action); 562 } 563 564 // Process this multi-put request 565 List<PutStatus> failed = null; 566 Object[] results = new Object[actions.size()]; 567 ServerName server = addr.getServerName(); 568 Map<ServerName, MultiAction> actionsByServer = Collections.singletonMap(server, actions); 569 try { 570 AsyncProcessTask task = AsyncProcessTask.newBuilder().setResults(results).setPool(pool) 571 .setRpcTimeout(writeRpcTimeout).setOperationTimeout(operationTimeout).build(); 572 AsyncRequestFuture arf = 573 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); 574 arf.waitUntilDone(); 575 if (arf.hasError()) { 576 // We just log and ignore the exception here since failed Puts will be resubmit again. 577 LOG.debug("Caught some exceptions when flushing puts to region server " 578 + addr.getHostnamePort(), arf.getErrors()); 579 } 580 } finally { 581 for (int i = 0; i < results.length; i++) { 582 if (results[i] instanceof Result) { 583 failedCount--; 584 } else { 585 if (failed == null) { 586 failed = new ArrayList<>(); 587 } 588 failed.add(processingList.get(i)); 589 } 590 } 591 } 592 593 if (failed != null) { 594 // Resubmit failed puts 595 for (PutStatus putStatus : failed) { 596 if (resubmitFailedPut(putStatus, this.addr)) { 597 failedCount--; 598 } 599 } 600 } 601 602 long elapsed = EnvironmentEdgeManager.currentTime() - start; 603 // Update latency counters 604 averageLatency.add(elapsed); 605 if (elapsed > maxLatency.get()) { 606 maxLatency.set(elapsed); 607 } 608 609 // Log some basic info 610 if (LOG.isDebugEnabled()) { 611 LOG.debug( 612 "Processed " + currentProcessingCount + " put requests for " + addr.getHostnamePort() 613 + " and " + failedCount + " failed" + ", latency for this send: " + elapsed); 614 } 615 616 // Reset the current processing put count 617 currentProcessingCount.set(0); 618 } catch (RuntimeException e) { 619 // To make findbugs happy 620 // Log all the exceptions and move on 621 LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " 622 + addr.getHostnamePort(), e); 623 } catch (Exception e) { 624 if (e instanceof InterruptedException) { 625 Thread.currentThread().interrupt(); 626 } 627 // Log all the exceptions and move on 628 LOG.debug("Caught some exceptions " + e + " when flushing puts to region server " 629 + addr.getHostnamePort(), e); 630 } finally { 631 // Update the totalFailedCount 632 this.totalFailedPutCount.addAndGet(failedCount); 633 } 634 } 635 } 636}