001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.util.AbstractMap.SimpleEntry; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.LinkedBlockingQueue; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.HRegionLocation; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051 052/** 053 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. 054 * Each put will be sharded into different buffer queues based on its destination region server. 055 * So each region server buffer queue will only have the puts which share the same destination. 056 * And each queue will have a flush worker thread to flush the puts request to the region server. 057 * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that 058 * particular queue. 059 * 060 * Also all the puts will be retried as a configuration number before dropping. 061 * And the HTableMultiplexer can report the number of buffered requests and the number of the 062 * failed (dropped) requests in total or on per region server basis. 063 * 064 * This class is thread safe. 065 */ 066@InterfaceAudience.Public 067public class HTableMultiplexer { 068 private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); 069 070 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = 071 "hbase.tablemultiplexer.flush.period.ms"; 072 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; 073 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = 074 "hbase.client.max.retries.in.queue"; 075 076 /** The map between each region server to its flush worker */ 077 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = 078 new ConcurrentHashMap<>(); 079 080 private final Configuration workerConf; 081 private final ClusterConnection conn; 082 private final ExecutorService pool; 083 private final int maxAttempts; 084 private final int perRegionServerBufferQueueSize; 085 private final int maxKeyValueSize; 086 private final ScheduledExecutorService executor; 087 private final long flushPeriod; 088 089 /** 090 * @param conf The HBaseConfiguration 091 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 092 * each region server before dropping the request. 093 */ 094 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) 095 throws IOException { 096 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); 097 } 098 099 /** 100 * @param conn The HBase connection. 101 * @param conf The HBase configuration 102 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 103 * each region server before dropping the request. 104 */ 105 public HTableMultiplexer(Connection conn, Configuration conf, 106 int perRegionServerBufferQueueSize) { 107 this.conn = (ClusterConnection) conn; 108 this.pool = HTable.getDefaultExecutor(conf); 109 // how many times we could try in total, one more than retry number 110 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 111 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 112 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; 113 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); 114 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); 115 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); 116 this.executor = 117 Executors.newScheduledThreadPool(initThreads, 118 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); 119 120 this.workerConf = HBaseConfiguration.create(conf); 121 // We do not do the retry because we need to reassign puts to different queues if regions are 122 // moved. 123 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 124 } 125 126 /** 127 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already 128 * been closed. 129 * @throws IOException If there is an error closing the connection. 130 */ 131 @SuppressWarnings("deprecation") 132 public synchronized void close() throws IOException { 133 if (!getConnection().isClosed()) { 134 getConnection().close(); 135 } 136 } 137 138 /** 139 * The put request will be buffered by its corresponding buffer queue. Return false if the queue 140 * is already full. 141 * @param tableName 142 * @param put 143 * @return true if the request can be accepted by its corresponding buffer queue. 144 */ 145 public boolean put(TableName tableName, final Put put) { 146 return put(tableName, put, this.maxAttempts); 147 } 148 149 /** 150 * The puts request will be buffered by their corresponding buffer queue. 151 * Return the list of puts which could not be queued. 152 * @param tableName 153 * @param puts 154 * @return the list of puts which could not be queued 155 */ 156 public List<Put> put(TableName tableName, final List<Put> puts) { 157 if (puts == null) 158 return null; 159 160 List <Put> failedPuts = null; 161 boolean result; 162 for (Put put : puts) { 163 result = put(tableName, put, this.maxAttempts); 164 if (result == false) { 165 166 // Create the failed puts list if necessary 167 if (failedPuts == null) { 168 failedPuts = new ArrayList<>(); 169 } 170 // Add the put to the failed puts list 171 failedPuts.add(put); 172 } 173 } 174 return failedPuts; 175 } 176 177 /** 178 * @deprecated Use {@link #put(TableName, List) } instead. 179 */ 180 @Deprecated 181 public List<Put> put(byte[] tableName, final List<Put> puts) { 182 return put(TableName.valueOf(tableName), puts); 183 } 184 185 /** 186 * The put request will be buffered by its corresponding buffer queue. And the put request will be 187 * retried before dropping the request. 188 * Return false if the queue is already full. 189 * @return true if the request can be accepted by its corresponding buffer queue. 190 */ 191 public boolean put(final TableName tableName, final Put put, int maxAttempts) { 192 if (maxAttempts <= 0) { 193 return false; 194 } 195 196 try { 197 HTable.validatePut(put, maxKeyValueSize); 198 // Allow mocking to get at the connection, but don't expose the connection to users. 199 ClusterConnection conn = (ClusterConnection) getConnection(); 200 // AsyncProcess in the FlushWorker should take care of refreshing the location cache 201 // as necessary. We shouldn't have to do that here. 202 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); 203 if (loc != null) { 204 // Add the put pair into its corresponding queue. 205 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); 206 207 // Generate a MultiPutStatus object and offer it into the queue 208 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); 209 210 return queue.offer(s); 211 } 212 } catch (IOException e) { 213 LOG.debug("Cannot process the put " + put, e); 214 } 215 return false; 216 } 217 218 /** 219 * @deprecated Use {@link #put(TableName, Put) } instead. 220 */ 221 @Deprecated 222 public boolean put(final byte[] tableName, final Put put, int retry) { 223 return put(TableName.valueOf(tableName), put, retry); 224 } 225 226 /** 227 * @deprecated Use {@link #put(TableName, Put)} instead. 228 */ 229 @Deprecated 230 public boolean put(final byte[] tableName, Put put) { 231 return put(TableName.valueOf(tableName), put); 232 } 233 234 /** 235 * @return the current HTableMultiplexerStatus 236 */ 237 public HTableMultiplexerStatus getHTableMultiplexerStatus() { 238 return new HTableMultiplexerStatus(serverToFlushWorkerMap); 239 } 240 241 @VisibleForTesting 242 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { 243 FlushWorker worker = serverToFlushWorkerMap.get(addr); 244 if (worker == null) { 245 synchronized (this.serverToFlushWorkerMap) { 246 worker = serverToFlushWorkerMap.get(addr); 247 if (worker == null) { 248 // Create the flush worker 249 worker = new FlushWorker(workerConf, this.conn, addr, this, 250 perRegionServerBufferQueueSize, pool, executor); 251 this.serverToFlushWorkerMap.put(addr, worker); 252 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); 253 } 254 } 255 } 256 return worker.getQueue(); 257 } 258 259 @VisibleForTesting 260 ClusterConnection getConnection() { 261 return this.conn; 262 } 263 264 /** 265 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. 266 * report the number of buffered requests and the number of the failed (dropped) requests 267 * in total or on per region server basis. 268 */ 269 @InterfaceAudience.Public 270 public static class HTableMultiplexerStatus { 271 private long totalFailedPutCounter; 272 private long totalBufferedPutCounter; 273 private long maxLatency; 274 private long overallAverageLatency; 275 private Map<String, Long> serverToFailedCounterMap; 276 private Map<String, Long> serverToBufferedCounterMap; 277 private Map<String, Long> serverToAverageLatencyMap; 278 private Map<String, Long> serverToMaxLatencyMap; 279 280 public HTableMultiplexerStatus( 281 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 282 this.totalBufferedPutCounter = 0; 283 this.totalFailedPutCounter = 0; 284 this.maxLatency = 0; 285 this.overallAverageLatency = 0; 286 this.serverToBufferedCounterMap = new HashMap<>(); 287 this.serverToFailedCounterMap = new HashMap<>(); 288 this.serverToAverageLatencyMap = new HashMap<>(); 289 this.serverToMaxLatencyMap = new HashMap<>(); 290 this.initialize(serverToFlushWorkerMap); 291 } 292 293 private void initialize( 294 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 295 if (serverToFlushWorkerMap == null) { 296 return; 297 } 298 299 long averageCalcSum = 0; 300 int averageCalcCount = 0; 301 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap 302 .entrySet()) { 303 HRegionLocation addr = entry.getKey(); 304 FlushWorker worker = entry.getValue(); 305 306 long bufferedCounter = worker.getTotalBufferedCount(); 307 long failedCounter = worker.getTotalFailedCount(); 308 long serverMaxLatency = worker.getMaxLatency(); 309 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); 310 // Get sum and count pieces separately to compute overall average 311 SimpleEntry<Long, Integer> averageComponents = averageCounter 312 .getComponents(); 313 long serverAvgLatency = averageCounter.getAndReset(); 314 315 this.totalBufferedPutCounter += bufferedCounter; 316 this.totalFailedPutCounter += failedCounter; 317 if (serverMaxLatency > this.maxLatency) { 318 this.maxLatency = serverMaxLatency; 319 } 320 averageCalcSum += averageComponents.getKey(); 321 averageCalcCount += averageComponents.getValue(); 322 323 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), 324 bufferedCounter); 325 this.serverToFailedCounterMap 326 .put(addr.getHostnamePort(), 327 failedCounter); 328 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), 329 serverAvgLatency); 330 this.serverToMaxLatencyMap 331 .put(addr.getHostnamePort(), 332 serverMaxLatency); 333 } 334 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum 335 / averageCalcCount : 0; 336 } 337 338 public long getTotalBufferedCounter() { 339 return this.totalBufferedPutCounter; 340 } 341 342 public long getTotalFailedCounter() { 343 return this.totalFailedPutCounter; 344 } 345 346 public long getMaxLatency() { 347 return this.maxLatency; 348 } 349 350 public long getOverallAverageLatency() { 351 return this.overallAverageLatency; 352 } 353 354 public Map<String, Long> getBufferedCounterForEachRegionServer() { 355 return this.serverToBufferedCounterMap; 356 } 357 358 public Map<String, Long> getFailedCounterForEachRegionServer() { 359 return this.serverToFailedCounterMap; 360 } 361 362 public Map<String, Long> getMaxLatencyForEachRegionServer() { 363 return this.serverToMaxLatencyMap; 364 } 365 366 public Map<String, Long> getAverageLatencyForEachRegionServer() { 367 return this.serverToAverageLatencyMap; 368 } 369 } 370 371 @VisibleForTesting 372 static class PutStatus { 373 final RegionInfo regionInfo; 374 final Put put; 375 final int maxAttempCount; 376 377 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { 378 this.regionInfo = regionInfo; 379 this.put = put; 380 this.maxAttempCount = maxAttempCount; 381 } 382 } 383 384 /** 385 * Helper to count the average over an interval until reset. 386 */ 387 private static class AtomicAverageCounter { 388 private long sum; 389 private int count; 390 391 public AtomicAverageCounter() { 392 this.sum = 0L; 393 this.count = 0; 394 } 395 396 public synchronized long getAndReset() { 397 long result = this.get(); 398 this.reset(); 399 return result; 400 } 401 402 public synchronized long get() { 403 if (this.count == 0) { 404 return 0; 405 } 406 return this.sum / this.count; 407 } 408 409 public synchronized SimpleEntry<Long, Integer> getComponents() { 410 return new SimpleEntry<>(sum, count); 411 } 412 413 public synchronized void reset() { 414 this.sum = 0L; 415 this.count = 0; 416 } 417 418 public synchronized void add(long value) { 419 this.sum += value; 420 this.count++; 421 } 422 } 423 424 @VisibleForTesting 425 static class FlushWorker implements Runnable { 426 private final HRegionLocation addr; 427 private final LinkedBlockingQueue<PutStatus> queue; 428 private final HTableMultiplexer multiplexer; 429 private final AtomicLong totalFailedPutCount = new AtomicLong(0); 430 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); 431 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); 432 private final AtomicLong maxLatency = new AtomicLong(0); 433 434 private final AsyncProcess ap; 435 private final List<PutStatus> processingList = new ArrayList<>(); 436 private final ScheduledExecutorService executor; 437 private final int maxRetryInQueue; 438 private final AtomicInteger retryInQueue = new AtomicInteger(0); 439 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor 440 private final int operationTimeout; 441 private final ExecutorService pool; 442 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, 443 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, 444 ExecutorService pool, ScheduledExecutorService executor) { 445 this.addr = addr; 446 this.multiplexer = htableMultiplexer; 447 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); 448 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); 449 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); 450 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 451 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 452 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 453 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 454 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 455 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); 456 this.executor = executor; 457 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); 458 this.pool = pool; 459 } 460 461 protected LinkedBlockingQueue<PutStatus> getQueue() { 462 return this.queue; 463 } 464 465 public long getTotalFailedCount() { 466 return totalFailedPutCount.get(); 467 } 468 469 public long getTotalBufferedCount() { 470 return (long) queue.size() + currentProcessingCount.get(); 471 } 472 473 public AtomicAverageCounter getAverageLatencyCounter() { 474 return this.averageLatency; 475 } 476 477 public long getMaxLatency() { 478 return this.maxLatency.getAndSet(0); 479 } 480 481 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { 482 // Decrease the retry count 483 final int retryCount = ps.maxAttempCount - 1; 484 485 if (retryCount <= 0) { 486 // Update the failed counter and no retry any more. 487 return false; 488 } 489 490 int cnt = getRetryInQueue().incrementAndGet(); 491 if (cnt > getMaxRetryInQueue()) { 492 // Too many Puts in queue for resubmit, give up this 493 getRetryInQueue().decrementAndGet(); 494 return false; 495 } 496 497 final Put failedPut = ps.put; 498 // The currentPut is failed. So get the table name for the currentPut. 499 final TableName tableName = ps.regionInfo.getTable(); 500 501 long delayMs = getNextDelay(retryCount); 502 if (LOG.isDebugEnabled()) { 503 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); 504 } 505 506 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating 507 // the region location cache when the Put original failed with some exception. If we keep 508 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff 509 // that we expect it to. 510 getExecutor().schedule(new Runnable() { 511 @Override 512 public void run() { 513 boolean succ = false; 514 try { 515 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); 516 } finally { 517 FlushWorker.this.getRetryInQueue().decrementAndGet(); 518 if (!succ) { 519 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); 520 } 521 } 522 } 523 }, delayMs, TimeUnit.MILLISECONDS); 524 return true; 525 } 526 527 @VisibleForTesting 528 long getNextDelay(int retryCount) { 529 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, 530 multiplexer.maxAttempts - retryCount - 1); 531 } 532 533 @VisibleForTesting 534 AtomicInteger getRetryInQueue() { 535 return this.retryInQueue; 536 } 537 538 @VisibleForTesting 539 int getMaxRetryInQueue() { 540 return this.maxRetryInQueue; 541 } 542 543 @VisibleForTesting 544 AtomicLong getTotalFailedPutCount() { 545 return this.totalFailedPutCount; 546 } 547 548 @VisibleForTesting 549 HTableMultiplexer getMultiplexer() { 550 return this.multiplexer; 551 } 552 553 @VisibleForTesting 554 ScheduledExecutorService getExecutor() { 555 return this.executor; 556 } 557 558 @Override 559 public void run() { 560 int failedCount = 0; 561 try { 562 long start = EnvironmentEdgeManager.currentTime(); 563 564 // drain all the queued puts into the tmp list 565 processingList.clear(); 566 queue.drainTo(processingList); 567 if (processingList.isEmpty()) { 568 // Nothing to flush 569 return; 570 } 571 572 currentProcessingCount.set(processingList.size()); 573 // failedCount is decreased whenever a Put is success or resubmit. 574 failedCount = processingList.size(); 575 576 List<Action> retainedActions = new ArrayList<>(processingList.size()); 577 MultiAction actions = new MultiAction(); 578 for (int i = 0; i < processingList.size(); i++) { 579 PutStatus putStatus = processingList.get(i); 580 Action action = new Action(putStatus.put, i); 581 actions.add(putStatus.regionInfo.getRegionName(), action); 582 retainedActions.add(action); 583 } 584 585 // Process this multi-put request 586 List<PutStatus> failed = null; 587 Object[] results = new Object[actions.size()]; 588 ServerName server = addr.getServerName(); 589 Map<ServerName, MultiAction> actionsByServer = 590 Collections.singletonMap(server, actions); 591 try { 592 AsyncProcessTask task = AsyncProcessTask.newBuilder() 593 .setResults(results) 594 .setPool(pool) 595 .setRpcTimeout(writeRpcTimeout) 596 .setOperationTimeout(operationTimeout) 597 .build(); 598 AsyncRequestFuture arf = 599 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); 600 arf.waitUntilDone(); 601 if (arf.hasError()) { 602 // We just log and ignore the exception here since failed Puts will be resubmit again. 603 LOG.debug("Caught some exceptions when flushing puts to region server " 604 + addr.getHostnamePort(), arf.getErrors()); 605 } 606 } finally { 607 for (int i = 0; i < results.length; i++) { 608 if (results[i] instanceof Result) { 609 failedCount--; 610 } else { 611 if (failed == null) { 612 failed = new ArrayList<>(); 613 } 614 failed.add(processingList.get(i)); 615 } 616 } 617 } 618 619 if (failed != null) { 620 // Resubmit failed puts 621 for (PutStatus putStatus : failed) { 622 if (resubmitFailedPut(putStatus, this.addr)) { 623 failedCount--; 624 } 625 } 626 } 627 628 long elapsed = EnvironmentEdgeManager.currentTime() - start; 629 // Update latency counters 630 averageLatency.add(elapsed); 631 if (elapsed > maxLatency.get()) { 632 maxLatency.set(elapsed); 633 } 634 635 // Log some basic info 636 if (LOG.isDebugEnabled()) { 637 LOG.debug("Processed " + currentProcessingCount + " put requests for " 638 + addr.getHostnamePort() + " and " + failedCount + " failed" 639 + ", latency for this send: " + elapsed); 640 } 641 642 // Reset the current processing put count 643 currentProcessingCount.set(0); 644 } catch (RuntimeException e) { 645 // To make findbugs happy 646 // Log all the exceptions and move on 647 LOG.debug( 648 "Caught some exceptions " + e + " when flushing puts to region server " 649 + addr.getHostnamePort(), e); 650 } catch (Exception e) { 651 if (e instanceof InterruptedException) { 652 Thread.currentThread().interrupt(); 653 } 654 // Log all the exceptions and move on 655 LOG.debug( 656 "Caught some exceptions " + e + " when flushing puts to region server " 657 + addr.getHostnamePort(), e); 658 } finally { 659 // Update the totalFailedCount 660 this.totalFailedPutCount.addAndGet(failedCount); 661 } 662 } 663 } 664}