View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.AbstractMap.SimpleEntry;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicLong;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  
56  /**
57   * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
58   * Each put will be sharded into different buffer queues based on its destination region server.
59   * So each region server buffer queue will only have the puts which share the same destination.
60   * And each queue will have a flush worker thread to flush the puts request to the region server.
61   * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that
62   * particular queue.
63   *
64   * Also all the puts will be retried as a configuration number before dropping.
65   * And the HTableMultiplexer can report the number of buffered requests and the number of the
66   * failed (dropped) requests in total or on per region server basis.
67   *
68   * This class is thread safe.
69   */
70  @InterfaceAudience.Public
71  @InterfaceStability.Evolving
72  public class HTableMultiplexer {
73    private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
74  
75    public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
76        "hbase.tablemultiplexer.flush.period.ms";
77    public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
78    public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
79        "hbase.client.max.retries.in.queue";
80  
81    /** The map between each region server to its flush worker */
82    private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
83        new ConcurrentHashMap<>();
84  
85    private final Configuration workerConf;
86    private final ClusterConnection conn;
87    private final ExecutorService pool;
88    private final int maxAttempts;
89    private final int perRegionServerBufferQueueSize;
90    private final int maxKeyValueSize;
91    private final ScheduledExecutorService executor;
92    private final long flushPeriod;
93  
94    /**
95     * @param conf The HBaseConfiguration
96     * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
97     *          each region server before dropping the request.
98     */
99    public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
100       throws IOException {
101     this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
102   }
103 
104   /**
105    * @param conn The HBase connection.
106    * @param conf The HBase configuration
107    * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
108    *          each region server before dropping the request.
109    */
110   public HTableMultiplexer(Connection conn, Configuration conf,
111       int perRegionServerBufferQueueSize) {
112     this.conn = (ClusterConnection) conn;
113     this.pool = HTable.getDefaultExecutor(conf);
114     // how many times we could try in total, one more than retry number
115     this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
116         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
117     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
118     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
119     this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
120     int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
121     this.executor =
122         Executors.newScheduledThreadPool(initThreads,
123           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
124 
125     this.workerConf = HBaseConfiguration.create(conf);
126     // We do not do the retry because we need to reassign puts to different queues if regions are
127     // moved.
128     this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
129   }
130 
131   /**
132    * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
133    * been closed.
134    * @throws IOException If there is an error closing the connection.
135    */
136   @SuppressWarnings("deprecation")
137   public synchronized void close() throws IOException {
138     if (!getConnection().isClosed()) {
139       getConnection().close();
140     }
141   }
142 
143   /**
144    * The put request will be buffered by its corresponding buffer queue. Return false if the queue
145    * is already full.
146    * @param tableName
147    * @param put
148    * @return true if the request can be accepted by its corresponding buffer queue.
149    */
150   public boolean put(TableName tableName, final Put put) {
151     return put(tableName, put, this.maxAttempts);
152   }
153 
154   /**
155    * The puts request will be buffered by their corresponding buffer queue.
156    * Return the list of puts which could not be queued.
157    * @param tableName
158    * @param puts
159    * @return the list of puts which could not be queued
160    */
161   public List<Put> put(TableName tableName, final List<Put> puts) {
162     if (puts == null)
163       return null;
164 
165     List <Put> failedPuts = null;
166     boolean result;
167     for (Put put : puts) {
168       result = put(tableName, put, this.maxAttempts);
169       if (result == false) {
170 
171         // Create the failed puts list if necessary
172         if (failedPuts == null) {
173           failedPuts = new ArrayList<Put>();
174         }
175         // Add the put to the failed puts list
176         failedPuts.add(put);
177       }
178     }
179     return failedPuts;
180   }
181 
182   /**
183    * @deprecated Use {@link #put(TableName, List) } instead.
184    */
185   @Deprecated
186   public List<Put> put(byte[] tableName, final List<Put> puts) {
187     return put(TableName.valueOf(tableName), puts);
188   }
189 
190   /**
191    * The put request will be buffered by its corresponding buffer queue. And the put request will be
192    * retried before dropping the request.
193    * Return false if the queue is already full.
194    * @return true if the request can be accepted by its corresponding buffer queue.
195    */
196   public boolean put(final TableName tableName, final Put put, int maxAttempts) {
197     if (maxAttempts <= 0) {
198       return false;
199     }
200 
201     try {
202       HTable.validatePut(put, maxKeyValueSize);
203       // Allow mocking to get at the connection, but don't expose the connection to users.
204       ClusterConnection conn = (ClusterConnection) getConnection();
205       // AsyncProcess in the FlushWorker should take care of refreshing the location cache
206       // as necessary. We shouldn't have to do that here.
207       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
208       if (loc != null) {
209         // Add the put pair into its corresponding queue.
210         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
211 
212         // Generate a MultiPutStatus object and offer it into the queue
213         PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts);
214 
215         return queue.offer(s);
216       }
217     } catch (IOException e) {
218       LOG.debug("Cannot process the put " + put, e);
219     }
220     return false;
221   }
222 
223   /**
224    * @deprecated Use {@link #put(TableName, Put) } instead.
225    */
226   @Deprecated
227   public boolean put(final byte[] tableName, final Put put, int retry) {
228     return put(TableName.valueOf(tableName), put, retry);
229   }
230 
231   /**
232    * @deprecated Use {@link #put(TableName, Put)} instead.
233    */
234   @Deprecated
235   public boolean put(final byte[] tableName, Put put) {
236     return put(TableName.valueOf(tableName), put);
237   }
238 
239   /**
240    * @return the current HTableMultiplexerStatus
241    */
242   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
243     return new HTableMultiplexerStatus(serverToFlushWorkerMap);
244   }
245 
246   @VisibleForTesting
247   LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
248     FlushWorker worker = serverToFlushWorkerMap.get(addr);
249     if (worker == null) {
250       synchronized (this.serverToFlushWorkerMap) {
251         worker = serverToFlushWorkerMap.get(addr);
252         if (worker == null) {
253           // Create the flush worker
254           worker = new FlushWorker(workerConf, this.conn, addr, this,
255               perRegionServerBufferQueueSize, pool, executor);
256           this.serverToFlushWorkerMap.put(addr, worker);
257           executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
258         }
259       }
260     }
261     return worker.getQueue();
262   }
263 
264   @VisibleForTesting
265   ClusterConnection getConnection() {
266     return this.conn;
267   }
268 
269   /**
270    * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
271    * report the number of buffered requests and the number of the failed (dropped) requests
272    * in total or on per region server basis.
273    */
274   @InterfaceAudience.Public
275   @InterfaceStability.Evolving
276   public static class HTableMultiplexerStatus {
277     private long totalFailedPutCounter;
278     private long totalBufferedPutCounter;
279     private long maxLatency;
280     private long overallAverageLatency;
281     private Map<String, Long> serverToFailedCounterMap;
282     private Map<String, Long> serverToBufferedCounterMap;
283     private Map<String, Long> serverToAverageLatencyMap;
284     private Map<String, Long> serverToMaxLatencyMap;
285 
286     public HTableMultiplexerStatus(
287         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
288       this.totalBufferedPutCounter = 0;
289       this.totalFailedPutCounter = 0;
290       this.maxLatency = 0;
291       this.overallAverageLatency = 0;
292       this.serverToBufferedCounterMap = new HashMap<String, Long>();
293       this.serverToFailedCounterMap = new HashMap<String, Long>();
294       this.serverToAverageLatencyMap = new HashMap<String, Long>();
295       this.serverToMaxLatencyMap = new HashMap<String, Long>();
296       this.initialize(serverToFlushWorkerMap);
297     }
298 
299     private void initialize(
300         Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
301       if (serverToFlushWorkerMap == null) {
302         return;
303       }
304 
305       long averageCalcSum = 0;
306       int averageCalcCount = 0;
307       for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
308           .entrySet()) {
309         HRegionLocation addr = entry.getKey();
310         FlushWorker worker = entry.getValue();
311 
312         long bufferedCounter = worker.getTotalBufferedCount();
313         long failedCounter = worker.getTotalFailedCount();
314         long serverMaxLatency = worker.getMaxLatency();
315         AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
316         // Get sum and count pieces separately to compute overall average
317         SimpleEntry<Long, Integer> averageComponents = averageCounter
318             .getComponents();
319         long serverAvgLatency = averageCounter.getAndReset();
320 
321         this.totalBufferedPutCounter += bufferedCounter;
322         this.totalFailedPutCounter += failedCounter;
323         if (serverMaxLatency > this.maxLatency) {
324           this.maxLatency = serverMaxLatency;
325         }
326         averageCalcSum += averageComponents.getKey();
327         averageCalcCount += averageComponents.getValue();
328 
329         this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
330             bufferedCounter);
331         this.serverToFailedCounterMap
332             .put(addr.getHostnamePort(),
333             failedCounter);
334         this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
335             serverAvgLatency);
336         this.serverToMaxLatencyMap
337             .put(addr.getHostnamePort(),
338             serverMaxLatency);
339       }
340       this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
341           / averageCalcCount : 0;
342     }
343 
344     public long getTotalBufferedCounter() {
345       return this.totalBufferedPutCounter;
346     }
347 
348     public long getTotalFailedCounter() {
349       return this.totalFailedPutCounter;
350     }
351 
352     public long getMaxLatency() {
353       return this.maxLatency;
354     }
355 
356     public long getOverallAverageLatency() {
357       return this.overallAverageLatency;
358     }
359 
360     public Map<String, Long> getBufferedCounterForEachRegionServer() {
361       return this.serverToBufferedCounterMap;
362     }
363 
364     public Map<String, Long> getFailedCounterForEachRegionServer() {
365       return this.serverToFailedCounterMap;
366     }
367 
368     public Map<String, Long> getMaxLatencyForEachRegionServer() {
369       return this.serverToMaxLatencyMap;
370     }
371 
372     public Map<String, Long> getAverageLatencyForEachRegionServer() {
373       return this.serverToAverageLatencyMap;
374     }
375   }
376 
377   @VisibleForTesting
378   static class PutStatus {
379     final HRegionInfo regionInfo;
380     final Put put;
381     final int maxAttempCount;
382 
383     public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) {
384       this.regionInfo = regionInfo;
385       this.put = put;
386       this.maxAttempCount = maxAttempCount;
387     }
388   }
389 
390   /**
391    * Helper to count the average over an interval until reset.
392    */
393   private static class AtomicAverageCounter {
394     private long sum;
395     private int count;
396 
397     public AtomicAverageCounter() {
398       this.sum = 0L;
399       this.count = 0;
400     }
401 
402     public synchronized long getAndReset() {
403       long result = this.get();
404       this.reset();
405       return result;
406     }
407 
408     public synchronized long get() {
409       if (this.count == 0) {
410         return 0;
411       }
412       return this.sum / this.count;
413     }
414 
415     public synchronized SimpleEntry<Long, Integer> getComponents() {
416       return new SimpleEntry<Long, Integer>(sum, count);
417     }
418 
419     public synchronized void reset() {
420       this.sum = 0L;
421       this.count = 0;
422     }
423 
424     public synchronized void add(long value) {
425       this.sum += value;
426       this.count++;
427     }
428   }
429 
430   @VisibleForTesting
431   static class FlushWorker implements Runnable {
432     private final HRegionLocation addr;
433     private final LinkedBlockingQueue<PutStatus> queue;
434     private final HTableMultiplexer multiplexer;
435     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
436     private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
437     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
438     private final AtomicLong maxLatency = new AtomicLong(0);
439 
440     private final AsyncProcess ap;
441     private final List<PutStatus> processingList = new ArrayList<>();
442     private final ScheduledExecutorService executor;
443     private final int maxRetryInQueue;
444     private final AtomicInteger retryInQueue = new AtomicInteger(0);
445     private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
446
447     public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
448         HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
449         ExecutorService pool, ScheduledExecutorService executor) {
450       this.addr = addr;
451       this.multiplexer = htableMultiplexer;
452       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
453       RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
454       RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
455       this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
456           conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
457               HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
458       this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
459       this.executor = executor;
460       this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
461     }
462 
463     protected LinkedBlockingQueue<PutStatus> getQueue() {
464       return this.queue;
465     }
466 
467     public long getTotalFailedCount() {
468       return totalFailedPutCount.get();
469     }
470 
471     public long getTotalBufferedCount() {
472       return queue.size() + currentProcessingCount.get();
473     }
474 
475     public AtomicAverageCounter getAverageLatencyCounter() {
476       return this.averageLatency;
477     }
478 
479     public long getMaxLatency() {
480       return this.maxLatency.getAndSet(0);
481     }
482 
483     boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
484       // Decrease the retry count
485       final int retryCount = ps.maxAttempCount - 1;
486
487       if (retryCount <= 0) {
488         // Update the failed counter and no retry any more.
489         return false;
490       }
491
492       int cnt = getRetryInQueue().incrementAndGet();
493       if (cnt > getMaxRetryInQueue()) {
494         // Too many Puts in queue for resubmit, give up this
495         getRetryInQueue().decrementAndGet();
496         return false;
497       }
498 
499       final Put failedPut = ps.put;
500       // The currentPut is failed. So get the table name for the currentPut.
501       final TableName tableName = ps.regionInfo.getTable();
502
503       long delayMs = getNextDelay(retryCount);
504       if (LOG.isDebugEnabled()) {
505         LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
506       }
507
508       // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
509       // the region location cache when the Put original failed with some exception. If we keep
510       // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
511       // that we expect it to.
512       getExecutor().schedule(new Runnable() {
513         @Override
514         public void run() {
515           boolean succ = false;
516           try {
517             succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
518           } finally {
519             FlushWorker.this.getRetryInQueue().decrementAndGet();
520             if (!succ) {
521               FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
522             }
523           }
524         }
525       }, delayMs, TimeUnit.MILLISECONDS);
526       return true;
527     }
528
529     @VisibleForTesting
530     long getNextDelay(int retryCount) {
531       return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
532           multiplexer.maxAttempts - retryCount - 1);
533     }
534
535     @VisibleForTesting
536     AtomicInteger getRetryInQueue() {
537       return this.retryInQueue;
538     }
539
540     @VisibleForTesting
541     int getMaxRetryInQueue() {
542       return this.maxRetryInQueue;
543     }
544
545     @VisibleForTesting
546     AtomicLong getTotalFailedPutCount() {
547       return this.totalFailedPutCount;
548     }
549
550     @VisibleForTesting
551     HTableMultiplexer getMultiplexer() {
552       return this.multiplexer;
553     }
554
555     @VisibleForTesting
556     ScheduledExecutorService getExecutor() {
557       return this.executor;
558     }
559
560     @Override
561     public void run() {
562       int failedCount = 0;
563       try {
564         long start = EnvironmentEdgeManager.currentTime();
565
566         // drain all the queued puts into the tmp list
567         processingList.clear();
568         queue.drainTo(processingList);
569         if (processingList.size() == 0) {
570           // Nothing to flush
571           return;
572         }
573 
574         currentProcessingCount.set(processingList.size());
575         // failedCount is decreased whenever a Put is success or resubmit.
576         failedCount = processingList.size();
577
578         List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
579         MultiAction<Row> actions = new MultiAction<>();
580         for (int i = 0; i < processingList.size(); i++) {
581           PutStatus putStatus = processingList.get(i);
582           Action<Row> action = new Action<Row>(putStatus.put, i);
583           actions.add(putStatus.regionInfo.getRegionName(), action);
584           retainedActions.add(action);
585         }
586
587         // Process this multi-put request
588         List<PutStatus> failed = null;
589         Object[] results = new Object[actions.size()];
590         ServerName server = addr.getServerName();
591         Map<ServerName, MultiAction<Row>> actionsByServer =
592             Collections.singletonMap(server, actions);
593         try {
594           AsyncRequestFuture arf =
595               ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
596                 null, actionsByServer, null);
597           arf.waitUntilDone();
598           if (arf.hasError()) {
599             // We just log and ignore the exception here since failed Puts will be resubmit again.
600             LOG.debug("Caught some exceptions when flushing puts to region server "
601                 + addr.getHostnamePort(), arf.getErrors());
602           }
603         } finally {
604           for (int i = 0; i < results.length; i++) {
605             if (results[i] instanceof Result) {
606               failedCount--;
607             } else {
608               if (failed == null) {
609                 failed = new ArrayList<PutStatus>();
610               }
611               failed.add(processingList.get(i));
612             }
613           }
614         }
615
616         if (failed != null) {
617           // Resubmit failed puts
618           for (PutStatus putStatus : failed) {
619             if (resubmitFailedPut(putStatus, this.addr)) {
620               failedCount--;
621             }
622           }
623         }
624
625         long elapsed = EnvironmentEdgeManager.currentTime() - start;
626         // Update latency counters
627         averageLatency.add(elapsed);
628         if (elapsed > maxLatency.get()) {
629           maxLatency.set(elapsed);
630         }
631
632         // Log some basic info
633         if (LOG.isDebugEnabled()) {
634           LOG.debug("Processed " + currentProcessingCount + " put requests for "
635               + addr.getHostnamePort() + " and " + failedCount + " failed"
636               + ", latency for this send: " + elapsed);
637         }
638
639         // Reset the current processing put count
640         currentProcessingCount.set(0);
641       } catch (RuntimeException e) {
642         // To make findbugs happy
643         // Log all the exceptions and move on
644         LOG.debug(
645           "Caught some exceptions " + e + " when flushing puts to region server "
646               + addr.getHostnamePort(), e);
647       } catch (Exception e) {
648         if (e instanceof InterruptedException) {
649           Thread.currentThread().interrupt();
650         }
651         // Log all the exceptions and move on
652         LOG.debug(
653           "Caught some exceptions " + e + " when flushing puts to region server "
654               + addr.getHostnamePort(), e);
655       } finally {
656         // Update the totalFailedCount
657         this.totalFailedPutCount.addAndGet(failedCount);
658       }
659     }
660   }
661 }