001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.util.AbstractMap.SimpleEntry;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put
052 * will be sharded into different buffer queues based on its destination region server. So each
053 * region server buffer queue will only have the puts which share the same destination. And each
054 * queue will have a flush worker thread to flush the puts request to the region server. If any
055 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue.
056 * </p>
057 * Also all the puts will be retried as a configuration number before dropping. And the
058 * HTableMultiplexer can report the number of buffered requests and the number of the failed
059 * (dropped) requests in total or on per region server basis.
060 * <p/>
061 * This class is thread safe.
062 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
063 *             {@link BufferedMutator} for batching mutations.
064 */
065@Deprecated
066@InterfaceAudience.Public
067public class HTableMultiplexer {
068  private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName());
069
070  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
071      "hbase.tablemultiplexer.flush.period.ms";
072  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
073  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
074      "hbase.client.max.retries.in.queue";
075
076  /** The map between each region server to its flush worker */
077  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
078      new ConcurrentHashMap<>();
079
080  private final Configuration workerConf;
081  private final ClusterConnection conn;
082  private final ExecutorService pool;
083  private final int maxAttempts;
084  private final int perRegionServerBufferQueueSize;
085  private final int maxKeyValueSize;
086  private final ScheduledExecutorService executor;
087  private final long flushPeriod;
088
089  /**
090   * @param conf The HBaseConfiguration
091   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
092   *          each region server before dropping the request.
093   */
094  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
095      throws IOException {
096    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
097  }
098
099  /**
100   * @param conn The HBase connection.
101   * @param conf The HBase configuration
102   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
103   *          each region server before dropping the request.
104   */
105  public HTableMultiplexer(Connection conn, Configuration conf,
106      int perRegionServerBufferQueueSize) {
107    this.conn = (ClusterConnection) conn;
108    this.pool = HTable.getDefaultExecutor(conf);
109    // how many times we could try in total, one more than retry number
110    this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
111        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
112    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
113    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
114    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
115    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
116    this.executor =
117        Executors.newScheduledThreadPool(initThreads,
118          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
119
120    this.workerConf = HBaseConfiguration.create(conf);
121    // We do not do the retry because we need to reassign puts to different queues if regions are
122    // moved.
123    this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
124  }
125
126  /**
127   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
128   * been closed.
129   * @throws IOException If there is an error closing the connection.
130   */
131  public synchronized void close() throws IOException {
132    if (!getConnection().isClosed()) {
133      getConnection().close();
134    }
135  }
136
137  /**
138   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
139   * is already full.
140   * @param tableName
141   * @param put
142   * @return true if the request can be accepted by its corresponding buffer queue.
143   */
144  public boolean put(TableName tableName, final Put put) {
145    return put(tableName, put, this.maxAttempts);
146  }
147
148  /**
149   * The puts request will be buffered by their corresponding buffer queue.
150   * Return the list of puts which could not be queued.
151   * @param tableName
152   * @param puts
153   * @return the list of puts which could not be queued
154   */
155  public List<Put> put(TableName tableName, final List<Put> puts) {
156    if (puts == null)
157      return null;
158
159    List <Put> failedPuts = null;
160    boolean result;
161    for (Put put : puts) {
162      result = put(tableName, put, this.maxAttempts);
163      if (result == false) {
164
165        // Create the failed puts list if necessary
166        if (failedPuts == null) {
167          failedPuts = new ArrayList<>();
168        }
169        // Add the put to the failed puts list
170        failedPuts.add(put);
171      }
172    }
173    return failedPuts;
174  }
175
176  /**
177   * @deprecated Use {@link #put(TableName, List) } instead.
178   */
179  @Deprecated
180  public List<Put> put(byte[] tableName, final List<Put> puts) {
181    return put(TableName.valueOf(tableName), puts);
182  }
183
184  /**
185   * The put request will be buffered by its corresponding buffer queue. And the put request will be
186   * retried before dropping the request.
187   * Return false if the queue is already full.
188   * @return true if the request can be accepted by its corresponding buffer queue.
189   */
190  public boolean put(final TableName tableName, final Put put, int maxAttempts) {
191    if (maxAttempts <= 0) {
192      return false;
193    }
194
195    try {
196      ConnectionUtils.validatePut(put, maxKeyValueSize);
197      // Allow mocking to get at the connection, but don't expose the connection to users.
198      ClusterConnection conn = (ClusterConnection) getConnection();
199      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
200      // as necessary. We shouldn't have to do that here.
201      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
202      if (loc != null) {
203        // Add the put pair into its corresponding queue.
204        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
205
206        // Generate a MultiPutStatus object and offer it into the queue
207        PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
208
209        return queue.offer(s);
210      }
211    } catch (IOException e) {
212      LOG.debug("Cannot process the put " + put, e);
213    }
214    return false;
215  }
216
217  /**
218   * @deprecated Use {@link #put(TableName, Put) } instead.
219   */
220  @Deprecated
221  public boolean put(final byte[] tableName, final Put put, int retry) {
222    return put(TableName.valueOf(tableName), put, retry);
223  }
224
225  /**
226   * @deprecated Use {@link #put(TableName, Put)} instead.
227   */
228  @Deprecated
229  public boolean put(final byte[] tableName, Put put) {
230    return put(TableName.valueOf(tableName), put);
231  }
232
233  /**
234   * @return the current HTableMultiplexerStatus
235   */
236  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
237    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
238  }
239
240  @InterfaceAudience.Private
241  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
242    FlushWorker worker = serverToFlushWorkerMap.get(addr);
243    if (worker == null) {
244      synchronized (this.serverToFlushWorkerMap) {
245        worker = serverToFlushWorkerMap.get(addr);
246        if (worker == null) {
247          // Create the flush worker
248          worker = new FlushWorker(workerConf, this.conn, addr, this,
249              perRegionServerBufferQueueSize, pool, executor);
250          this.serverToFlushWorkerMap.put(addr, worker);
251          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
252        }
253      }
254    }
255    return worker.getQueue();
256  }
257
258  @InterfaceAudience.Private
259  ClusterConnection getConnection() {
260    return this.conn;
261  }
262
263  /**
264   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the
265   * number of buffered requests and the number of the failed (dropped) requests in total or on per
266   * region server basis.
267   * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
268   *             {@link BufferedMutator} for batching mutations.
269   */
270  @Deprecated
271  @InterfaceAudience.Public
272  public static class HTableMultiplexerStatus {
273    private long totalFailedPutCounter;
274    private long totalBufferedPutCounter;
275    private long maxLatency;
276    private long overallAverageLatency;
277    private Map<String, Long> serverToFailedCounterMap;
278    private Map<String, Long> serverToBufferedCounterMap;
279    private Map<String, Long> serverToAverageLatencyMap;
280    private Map<String, Long> serverToMaxLatencyMap;
281
282    public HTableMultiplexerStatus(
283        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
284      this.totalBufferedPutCounter = 0;
285      this.totalFailedPutCounter = 0;
286      this.maxLatency = 0;
287      this.overallAverageLatency = 0;
288      this.serverToBufferedCounterMap = new HashMap<>();
289      this.serverToFailedCounterMap = new HashMap<>();
290      this.serverToAverageLatencyMap = new HashMap<>();
291      this.serverToMaxLatencyMap = new HashMap<>();
292      this.initialize(serverToFlushWorkerMap);
293    }
294
295    private void initialize(
296        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
297      if (serverToFlushWorkerMap == null) {
298        return;
299      }
300
301      long averageCalcSum = 0;
302      int averageCalcCount = 0;
303      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
304          .entrySet()) {
305        HRegionLocation addr = entry.getKey();
306        FlushWorker worker = entry.getValue();
307
308        long bufferedCounter = worker.getTotalBufferedCount();
309        long failedCounter = worker.getTotalFailedCount();
310        long serverMaxLatency = worker.getMaxLatency();
311        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
312        // Get sum and count pieces separately to compute overall average
313        SimpleEntry<Long, Integer> averageComponents = averageCounter
314            .getComponents();
315        long serverAvgLatency = averageCounter.getAndReset();
316
317        this.totalBufferedPutCounter += bufferedCounter;
318        this.totalFailedPutCounter += failedCounter;
319        if (serverMaxLatency > this.maxLatency) {
320          this.maxLatency = serverMaxLatency;
321        }
322        averageCalcSum += averageComponents.getKey();
323        averageCalcCount += averageComponents.getValue();
324
325        this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
326            bufferedCounter);
327        this.serverToFailedCounterMap
328            .put(addr.getHostnamePort(),
329            failedCounter);
330        this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
331            serverAvgLatency);
332        this.serverToMaxLatencyMap
333            .put(addr.getHostnamePort(),
334            serverMaxLatency);
335      }
336      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
337          / averageCalcCount : 0;
338    }
339
340    public long getTotalBufferedCounter() {
341      return this.totalBufferedPutCounter;
342    }
343
344    public long getTotalFailedCounter() {
345      return this.totalFailedPutCounter;
346    }
347
348    public long getMaxLatency() {
349      return this.maxLatency;
350    }
351
352    public long getOverallAverageLatency() {
353      return this.overallAverageLatency;
354    }
355
356    public Map<String, Long> getBufferedCounterForEachRegionServer() {
357      return this.serverToBufferedCounterMap;
358    }
359
360    public Map<String, Long> getFailedCounterForEachRegionServer() {
361      return this.serverToFailedCounterMap;
362    }
363
364    public Map<String, Long> getMaxLatencyForEachRegionServer() {
365      return this.serverToMaxLatencyMap;
366    }
367
368    public Map<String, Long> getAverageLatencyForEachRegionServer() {
369      return this.serverToAverageLatencyMap;
370    }
371  }
372
373  @InterfaceAudience.Private
374  static class PutStatus {
375    final RegionInfo regionInfo;
376    final Put put;
377    final int maxAttempCount;
378
379    public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
380      this.regionInfo = regionInfo;
381      this.put = put;
382      this.maxAttempCount = maxAttempCount;
383    }
384  }
385
386  /**
387   * Helper to count the average over an interval until reset.
388   */
389  private static class AtomicAverageCounter {
390    private long sum;
391    private int count;
392
393    public AtomicAverageCounter() {
394      this.sum = 0L;
395      this.count = 0;
396    }
397
398    public synchronized long getAndReset() {
399      long result = this.get();
400      this.reset();
401      return result;
402    }
403
404    public synchronized long get() {
405      if (this.count == 0) {
406        return 0;
407      }
408      return this.sum / this.count;
409    }
410
411    public synchronized SimpleEntry<Long, Integer> getComponents() {
412      return new SimpleEntry<>(sum, count);
413    }
414
415    public synchronized void reset() {
416      this.sum = 0L;
417      this.count = 0;
418    }
419
420    public synchronized void add(long value) {
421      this.sum += value;
422      this.count++;
423    }
424  }
425
426  @InterfaceAudience.Private
427  static class FlushWorker implements Runnable {
428    private final HRegionLocation addr;
429    private final LinkedBlockingQueue<PutStatus> queue;
430    private final HTableMultiplexer multiplexer;
431    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
432    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
433    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
434    private final AtomicLong maxLatency = new AtomicLong(0);
435
436    private final AsyncProcess ap;
437    private final List<PutStatus> processingList = new ArrayList<>();
438    private final ScheduledExecutorService executor;
439    private final int maxRetryInQueue;
440    private final AtomicInteger retryInQueue = new AtomicInteger(0);
441    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
442    private final int operationTimeout;
443    private final ExecutorService pool;
444    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
445        HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
446        ExecutorService pool, ScheduledExecutorService executor) {
447      this.addr = addr;
448      this.multiplexer = htableMultiplexer;
449      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
450      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
451      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
452      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
453          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
454              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
455      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
456          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
457      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
458      this.executor = executor;
459      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
460      this.pool = pool;
461    }
462
463    protected LinkedBlockingQueue<PutStatus> getQueue() {
464      return this.queue;
465    }
466
467    public long getTotalFailedCount() {
468      return totalFailedPutCount.get();
469    }
470
471    public long getTotalBufferedCount() {
472      return (long) queue.size() + currentProcessingCount.get();
473    }
474
475    public AtomicAverageCounter getAverageLatencyCounter() {
476      return this.averageLatency;
477    }
478
479    public long getMaxLatency() {
480      return this.maxLatency.getAndSet(0);
481    }
482
483    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
484      // Decrease the retry count
485      final int retryCount = ps.maxAttempCount - 1;
486
487      if (retryCount <= 0) {
488        // Update the failed counter and no retry any more.
489        return false;
490      }
491
492      int cnt = getRetryInQueue().incrementAndGet();
493      if (cnt > getMaxRetryInQueue()) {
494        // Too many Puts in queue for resubmit, give up this
495        getRetryInQueue().decrementAndGet();
496        return false;
497      }
498
499      final Put failedPut = ps.put;
500      // The currentPut is failed. So get the table name for the currentPut.
501      final TableName tableName = ps.regionInfo.getTable();
502
503      long delayMs = getNextDelay(retryCount);
504      if (LOG.isDebugEnabled()) {
505        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
506      }
507
508      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
509      // the region location cache when the Put original failed with some exception. If we keep
510      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
511      // that we expect it to.
512      getExecutor().schedule(new Runnable() {
513        @Override
514        public void run() {
515          boolean succ = false;
516          try {
517            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
518          } finally {
519            FlushWorker.this.getRetryInQueue().decrementAndGet();
520            if (!succ) {
521              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
522            }
523          }
524        }
525      }, delayMs, TimeUnit.MILLISECONDS);
526      return true;
527    }
528
529    @InterfaceAudience.Private
530    long getNextDelay(int retryCount) {
531      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
532          multiplexer.maxAttempts - retryCount - 1);
533    }
534
535    @InterfaceAudience.Private
536    AtomicInteger getRetryInQueue() {
537      return this.retryInQueue;
538    }
539
540    @InterfaceAudience.Private
541    int getMaxRetryInQueue() {
542      return this.maxRetryInQueue;
543    }
544
545    @InterfaceAudience.Private
546    AtomicLong getTotalFailedPutCount() {
547      return this.totalFailedPutCount;
548    }
549
550    @InterfaceAudience.Private
551    HTableMultiplexer getMultiplexer() {
552      return this.multiplexer;
553    }
554
555    @InterfaceAudience.Private
556    ScheduledExecutorService getExecutor() {
557      return this.executor;
558    }
559
560    @Override
561    public void run() {
562      int failedCount = 0;
563      try {
564        long start = EnvironmentEdgeManager.currentTime();
565
566        // drain all the queued puts into the tmp list
567        processingList.clear();
568        queue.drainTo(processingList);
569        if (processingList.isEmpty()) {
570          // Nothing to flush
571          return;
572        }
573
574        currentProcessingCount.set(processingList.size());
575        // failedCount is decreased whenever a Put is success or resubmit.
576        failedCount = processingList.size();
577
578        List<Action> retainedActions = new ArrayList<>(processingList.size());
579        MultiAction actions = new MultiAction();
580        for (int i = 0; i < processingList.size(); i++) {
581          PutStatus putStatus = processingList.get(i);
582          Action action = new Action(putStatus.put, i);
583          actions.add(putStatus.regionInfo.getRegionName(), action);
584          retainedActions.add(action);
585        }
586
587        // Process this multi-put request
588        List<PutStatus> failed = null;
589        Object[] results = new Object[actions.size()];
590        ServerName server = addr.getServerName();
591        Map<ServerName, MultiAction> actionsByServer =
592            Collections.singletonMap(server, actions);
593        try {
594          AsyncProcessTask task = AsyncProcessTask.newBuilder()
595                  .setResults(results)
596                  .setPool(pool)
597                  .setRpcTimeout(writeRpcTimeout)
598                  .setOperationTimeout(operationTimeout)
599                  .build();
600          AsyncRequestFuture arf =
601              ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
602          arf.waitUntilDone();
603          if (arf.hasError()) {
604            // We just log and ignore the exception here since failed Puts will be resubmit again.
605            LOG.debug("Caught some exceptions when flushing puts to region server "
606                + addr.getHostnamePort(), arf.getErrors());
607          }
608        } finally {
609          for (int i = 0; i < results.length; i++) {
610            if (results[i] instanceof Result) {
611              failedCount--;
612            } else {
613              if (failed == null) {
614                failed = new ArrayList<>();
615              }
616              failed.add(processingList.get(i));
617            }
618          }
619        }
620
621        if (failed != null) {
622          // Resubmit failed puts
623          for (PutStatus putStatus : failed) {
624            if (resubmitFailedPut(putStatus, this.addr)) {
625              failedCount--;
626            }
627          }
628        }
629
630        long elapsed = EnvironmentEdgeManager.currentTime() - start;
631        // Update latency counters
632        averageLatency.add(elapsed);
633        if (elapsed > maxLatency.get()) {
634          maxLatency.set(elapsed);
635        }
636
637        // Log some basic info
638        if (LOG.isDebugEnabled()) {
639          LOG.debug("Processed " + currentProcessingCount + " put requests for "
640              + addr.getHostnamePort() + " and " + failedCount + " failed"
641              + ", latency for this send: " + elapsed);
642        }
643
644        // Reset the current processing put count
645        currentProcessingCount.set(0);
646      } catch (RuntimeException e) {
647        // To make findbugs happy
648        // Log all the exceptions and move on
649        LOG.debug(
650          "Caught some exceptions " + e + " when flushing puts to region server "
651              + addr.getHostnamePort(), e);
652      } catch (Exception e) {
653        if (e instanceof InterruptedException) {
654          Thread.currentThread().interrupt();
655        }
656        // Log all the exceptions and move on
657        LOG.debug(
658          "Caught some exceptions " + e + " when flushing puts to region server "
659              + addr.getHostnamePort(), e);
660      } finally {
661        // Update the totalFailedCount
662        this.totalFailedPutCount.addAndGet(failedCount);
663      }
664    }
665  }
666}