001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020package org.apache.hadoop.hbase.client;
021
022import java.io.IOException;
023import java.util.AbstractMap.SimpleEntry;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.LinkedBlockingQueue;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicLong;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
051
052/**
053 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put
054 * will be sharded into different buffer queues based on its destination region server. So each
055 * region server buffer queue will only have the puts which share the same destination. And each
056 * queue will have a flush worker thread to flush the puts request to the region server. If any
057 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue.
058 * </p>
059 * Also all the puts will be retried as a configuration number before dropping. And the
060 * HTableMultiplexer can report the number of buffered requests and the number of the failed
061 * (dropped) requests in total or on per region server basis.
062 * <p/>
063 * This class is thread safe.
064 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
065 *             {@link BufferedMutator} for batching mutations.
066 */
067@Deprecated
068@InterfaceAudience.Public
069public class HTableMultiplexer {
070  private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName());
071
072  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
073      "hbase.tablemultiplexer.flush.period.ms";
074  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
075  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
076      "hbase.client.max.retries.in.queue";
077
078  /** The map between each region server to its flush worker */
079  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
080      new ConcurrentHashMap<>();
081
082  private final Configuration workerConf;
083  private final ClusterConnection conn;
084  private final ExecutorService pool;
085  private final int maxAttempts;
086  private final int perRegionServerBufferQueueSize;
087  private final int maxKeyValueSize;
088  private final ScheduledExecutorService executor;
089  private final long flushPeriod;
090
091  /**
092   * @param conf The HBaseConfiguration
093   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
094   *          each region server before dropping the request.
095   */
096  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
097      throws IOException {
098    this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize);
099  }
100
101  /**
102   * @param conn The HBase connection.
103   * @param conf The HBase configuration
104   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
105   *          each region server before dropping the request.
106   */
107  public HTableMultiplexer(Connection conn, Configuration conf,
108      int perRegionServerBufferQueueSize) {
109    this.conn = (ClusterConnection) conn;
110    this.pool = HTable.getDefaultExecutor(conf);
111    // how many times we could try in total, one more than retry number
112    this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
113        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
114    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
115    this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
116    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
117    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
118    this.executor =
119        Executors.newScheduledThreadPool(initThreads,
120          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
121
122    this.workerConf = HBaseConfiguration.create(conf);
123    // We do not do the retry because we need to reassign puts to different queues if regions are
124    // moved.
125    this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
126  }
127
128  /**
129   * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already
130   * been closed.
131   * @throws IOException If there is an error closing the connection.
132   */
133  public synchronized void close() throws IOException {
134    if (!getConnection().isClosed()) {
135      getConnection().close();
136    }
137  }
138
139  /**
140   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
141   * is already full.
142   * @param tableName
143   * @param put
144   * @return true if the request can be accepted by its corresponding buffer queue.
145   */
146  public boolean put(TableName tableName, final Put put) {
147    return put(tableName, put, this.maxAttempts);
148  }
149
150  /**
151   * The puts request will be buffered by their corresponding buffer queue.
152   * Return the list of puts which could not be queued.
153   * @param tableName
154   * @param puts
155   * @return the list of puts which could not be queued
156   */
157  public List<Put> put(TableName tableName, final List<Put> puts) {
158    if (puts == null)
159      return null;
160
161    List <Put> failedPuts = null;
162    boolean result;
163    for (Put put : puts) {
164      result = put(tableName, put, this.maxAttempts);
165      if (result == false) {
166
167        // Create the failed puts list if necessary
168        if (failedPuts == null) {
169          failedPuts = new ArrayList<>();
170        }
171        // Add the put to the failed puts list
172        failedPuts.add(put);
173      }
174    }
175    return failedPuts;
176  }
177
178  /**
179   * @deprecated Use {@link #put(TableName, List) } instead.
180   */
181  @Deprecated
182  public List<Put> put(byte[] tableName, final List<Put> puts) {
183    return put(TableName.valueOf(tableName), puts);
184  }
185
186  /**
187   * The put request will be buffered by its corresponding buffer queue. And the put request will be
188   * retried before dropping the request.
189   * Return false if the queue is already full.
190   * @return true if the request can be accepted by its corresponding buffer queue.
191   */
192  public boolean put(final TableName tableName, final Put put, int maxAttempts) {
193    if (maxAttempts <= 0) {
194      return false;
195    }
196
197    try {
198      ConnectionUtils.validatePut(put, maxKeyValueSize);
199      // Allow mocking to get at the connection, but don't expose the connection to users.
200      ClusterConnection conn = (ClusterConnection) getConnection();
201      // AsyncProcess in the FlushWorker should take care of refreshing the location cache
202      // as necessary. We shouldn't have to do that here.
203      HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
204      if (loc != null) {
205        // Add the put pair into its corresponding queue.
206        LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
207
208        // Generate a MultiPutStatus object and offer it into the queue
209        PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts);
210
211        return queue.offer(s);
212      }
213    } catch (IOException e) {
214      LOG.debug("Cannot process the put " + put, e);
215    }
216    return false;
217  }
218
219  /**
220   * @deprecated Use {@link #put(TableName, Put) } instead.
221   */
222  @Deprecated
223  public boolean put(final byte[] tableName, final Put put, int retry) {
224    return put(TableName.valueOf(tableName), put, retry);
225  }
226
227  /**
228   * @deprecated Use {@link #put(TableName, Put)} instead.
229   */
230  @Deprecated
231  public boolean put(final byte[] tableName, Put put) {
232    return put(TableName.valueOf(tableName), put);
233  }
234
235  /**
236   * @return the current HTableMultiplexerStatus
237   */
238  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
239    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
240  }
241
242  @VisibleForTesting
243  LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
244    FlushWorker worker = serverToFlushWorkerMap.get(addr);
245    if (worker == null) {
246      synchronized (this.serverToFlushWorkerMap) {
247        worker = serverToFlushWorkerMap.get(addr);
248        if (worker == null) {
249          // Create the flush worker
250          worker = new FlushWorker(workerConf, this.conn, addr, this,
251              perRegionServerBufferQueueSize, pool, executor);
252          this.serverToFlushWorkerMap.put(addr, worker);
253          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
254        }
255      }
256    }
257    return worker.getQueue();
258  }
259
260  @VisibleForTesting
261  ClusterConnection getConnection() {
262    return this.conn;
263  }
264
265  /**
266   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the
267   * number of buffered requests and the number of the failed (dropped) requests in total or on per
268   * region server basis.
269   * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use
270   *             {@link BufferedMutator} for batching mutations.
271   */
272  @Deprecated
273  @InterfaceAudience.Public
274  public static class HTableMultiplexerStatus {
275    private long totalFailedPutCounter;
276    private long totalBufferedPutCounter;
277    private long maxLatency;
278    private long overallAverageLatency;
279    private Map<String, Long> serverToFailedCounterMap;
280    private Map<String, Long> serverToBufferedCounterMap;
281    private Map<String, Long> serverToAverageLatencyMap;
282    private Map<String, Long> serverToMaxLatencyMap;
283
284    public HTableMultiplexerStatus(
285        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
286      this.totalBufferedPutCounter = 0;
287      this.totalFailedPutCounter = 0;
288      this.maxLatency = 0;
289      this.overallAverageLatency = 0;
290      this.serverToBufferedCounterMap = new HashMap<>();
291      this.serverToFailedCounterMap = new HashMap<>();
292      this.serverToAverageLatencyMap = new HashMap<>();
293      this.serverToMaxLatencyMap = new HashMap<>();
294      this.initialize(serverToFlushWorkerMap);
295    }
296
297    private void initialize(
298        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
299      if (serverToFlushWorkerMap == null) {
300        return;
301      }
302
303      long averageCalcSum = 0;
304      int averageCalcCount = 0;
305      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
306          .entrySet()) {
307        HRegionLocation addr = entry.getKey();
308        FlushWorker worker = entry.getValue();
309
310        long bufferedCounter = worker.getTotalBufferedCount();
311        long failedCounter = worker.getTotalFailedCount();
312        long serverMaxLatency = worker.getMaxLatency();
313        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
314        // Get sum and count pieces separately to compute overall average
315        SimpleEntry<Long, Integer> averageComponents = averageCounter
316            .getComponents();
317        long serverAvgLatency = averageCounter.getAndReset();
318
319        this.totalBufferedPutCounter += bufferedCounter;
320        this.totalFailedPutCounter += failedCounter;
321        if (serverMaxLatency > this.maxLatency) {
322          this.maxLatency = serverMaxLatency;
323        }
324        averageCalcSum += averageComponents.getKey();
325        averageCalcCount += averageComponents.getValue();
326
327        this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
328            bufferedCounter);
329        this.serverToFailedCounterMap
330            .put(addr.getHostnamePort(),
331            failedCounter);
332        this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
333            serverAvgLatency);
334        this.serverToMaxLatencyMap
335            .put(addr.getHostnamePort(),
336            serverMaxLatency);
337      }
338      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
339          / averageCalcCount : 0;
340    }
341
342    public long getTotalBufferedCounter() {
343      return this.totalBufferedPutCounter;
344    }
345
346    public long getTotalFailedCounter() {
347      return this.totalFailedPutCounter;
348    }
349
350    public long getMaxLatency() {
351      return this.maxLatency;
352    }
353
354    public long getOverallAverageLatency() {
355      return this.overallAverageLatency;
356    }
357
358    public Map<String, Long> getBufferedCounterForEachRegionServer() {
359      return this.serverToBufferedCounterMap;
360    }
361
362    public Map<String, Long> getFailedCounterForEachRegionServer() {
363      return this.serverToFailedCounterMap;
364    }
365
366    public Map<String, Long> getMaxLatencyForEachRegionServer() {
367      return this.serverToMaxLatencyMap;
368    }
369
370    public Map<String, Long> getAverageLatencyForEachRegionServer() {
371      return this.serverToAverageLatencyMap;
372    }
373  }
374
375  @VisibleForTesting
376  static class PutStatus {
377    final RegionInfo regionInfo;
378    final Put put;
379    final int maxAttempCount;
380
381    public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) {
382      this.regionInfo = regionInfo;
383      this.put = put;
384      this.maxAttempCount = maxAttempCount;
385    }
386  }
387
388  /**
389   * Helper to count the average over an interval until reset.
390   */
391  private static class AtomicAverageCounter {
392    private long sum;
393    private int count;
394
395    public AtomicAverageCounter() {
396      this.sum = 0L;
397      this.count = 0;
398    }
399
400    public synchronized long getAndReset() {
401      long result = this.get();
402      this.reset();
403      return result;
404    }
405
406    public synchronized long get() {
407      if (this.count == 0) {
408        return 0;
409      }
410      return this.sum / this.count;
411    }
412
413    public synchronized SimpleEntry<Long, Integer> getComponents() {
414      return new SimpleEntry<>(sum, count);
415    }
416
417    public synchronized void reset() {
418      this.sum = 0L;
419      this.count = 0;
420    }
421
422    public synchronized void add(long value) {
423      this.sum += value;
424      this.count++;
425    }
426  }
427
428  @VisibleForTesting
429  static class FlushWorker implements Runnable {
430    private final HRegionLocation addr;
431    private final LinkedBlockingQueue<PutStatus> queue;
432    private final HTableMultiplexer multiplexer;
433    private final AtomicLong totalFailedPutCount = new AtomicLong(0);
434    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
435    private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
436    private final AtomicLong maxLatency = new AtomicLong(0);
437
438    private final AsyncProcess ap;
439    private final List<PutStatus> processingList = new ArrayList<>();
440    private final ScheduledExecutorService executor;
441    private final int maxRetryInQueue;
442    private final AtomicInteger retryInQueue = new AtomicInteger(0);
443    private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
444    private final int operationTimeout;
445    private final ExecutorService pool;
446    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
447        HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
448        ExecutorService pool, ScheduledExecutorService executor) {
449      this.addr = addr;
450      this.multiplexer = htableMultiplexer;
451      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
452      RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
453      RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
454      this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
455          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
456              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
457      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
458          HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
459      this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory);
460      this.executor = executor;
461      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
462      this.pool = pool;
463    }
464
465    protected LinkedBlockingQueue<PutStatus> getQueue() {
466      return this.queue;
467    }
468
469    public long getTotalFailedCount() {
470      return totalFailedPutCount.get();
471    }
472
473    public long getTotalBufferedCount() {
474      return (long) queue.size() + currentProcessingCount.get();
475    }
476
477    public AtomicAverageCounter getAverageLatencyCounter() {
478      return this.averageLatency;
479    }
480
481    public long getMaxLatency() {
482      return this.maxLatency.getAndSet(0);
483    }
484
485    boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException {
486      // Decrease the retry count
487      final int retryCount = ps.maxAttempCount - 1;
488
489      if (retryCount <= 0) {
490        // Update the failed counter and no retry any more.
491        return false;
492      }
493
494      int cnt = getRetryInQueue().incrementAndGet();
495      if (cnt > getMaxRetryInQueue()) {
496        // Too many Puts in queue for resubmit, give up this
497        getRetryInQueue().decrementAndGet();
498        return false;
499      }
500
501      final Put failedPut = ps.put;
502      // The currentPut is failed. So get the table name for the currentPut.
503      final TableName tableName = ps.regionInfo.getTable();
504
505      long delayMs = getNextDelay(retryCount);
506      if (LOG.isDebugEnabled()) {
507        LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount);
508      }
509
510      // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating
511      // the region location cache when the Put original failed with some exception. If we keep
512      // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff
513      // that we expect it to.
514      getExecutor().schedule(new Runnable() {
515        @Override
516        public void run() {
517          boolean succ = false;
518          try {
519            succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
520          } finally {
521            FlushWorker.this.getRetryInQueue().decrementAndGet();
522            if (!succ) {
523              FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
524            }
525          }
526        }
527      }, delayMs, TimeUnit.MILLISECONDS);
528      return true;
529    }
530
531    @VisibleForTesting
532    long getNextDelay(int retryCount) {
533      return ConnectionUtils.getPauseTime(multiplexer.flushPeriod,
534          multiplexer.maxAttempts - retryCount - 1);
535    }
536
537    @VisibleForTesting
538    AtomicInteger getRetryInQueue() {
539      return this.retryInQueue;
540    }
541
542    @VisibleForTesting
543    int getMaxRetryInQueue() {
544      return this.maxRetryInQueue;
545    }
546
547    @VisibleForTesting
548    AtomicLong getTotalFailedPutCount() {
549      return this.totalFailedPutCount;
550    }
551
552    @VisibleForTesting
553    HTableMultiplexer getMultiplexer() {
554      return this.multiplexer;
555    }
556
557    @VisibleForTesting
558    ScheduledExecutorService getExecutor() {
559      return this.executor;
560    }
561
562    @Override
563    public void run() {
564      int failedCount = 0;
565      try {
566        long start = EnvironmentEdgeManager.currentTime();
567
568        // drain all the queued puts into the tmp list
569        processingList.clear();
570        queue.drainTo(processingList);
571        if (processingList.isEmpty()) {
572          // Nothing to flush
573          return;
574        }
575
576        currentProcessingCount.set(processingList.size());
577        // failedCount is decreased whenever a Put is success or resubmit.
578        failedCount = processingList.size();
579
580        List<Action> retainedActions = new ArrayList<>(processingList.size());
581        MultiAction actions = new MultiAction();
582        for (int i = 0; i < processingList.size(); i++) {
583          PutStatus putStatus = processingList.get(i);
584          Action action = new Action(putStatus.put, i);
585          actions.add(putStatus.regionInfo.getRegionName(), action);
586          retainedActions.add(action);
587        }
588
589        // Process this multi-put request
590        List<PutStatus> failed = null;
591        Object[] results = new Object[actions.size()];
592        ServerName server = addr.getServerName();
593        Map<ServerName, MultiAction> actionsByServer =
594            Collections.singletonMap(server, actions);
595        try {
596          AsyncProcessTask task = AsyncProcessTask.newBuilder()
597                  .setResults(results)
598                  .setPool(pool)
599                  .setRpcTimeout(writeRpcTimeout)
600                  .setOperationTimeout(operationTimeout)
601                  .build();
602          AsyncRequestFuture arf =
603              ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
604          arf.waitUntilDone();
605          if (arf.hasError()) {
606            // We just log and ignore the exception here since failed Puts will be resubmit again.
607            LOG.debug("Caught some exceptions when flushing puts to region server "
608                + addr.getHostnamePort(), arf.getErrors());
609          }
610        } finally {
611          for (int i = 0; i < results.length; i++) {
612            if (results[i] instanceof Result) {
613              failedCount--;
614            } else {
615              if (failed == null) {
616                failed = new ArrayList<>();
617              }
618              failed.add(processingList.get(i));
619            }
620          }
621        }
622
623        if (failed != null) {
624          // Resubmit failed puts
625          for (PutStatus putStatus : failed) {
626            if (resubmitFailedPut(putStatus, this.addr)) {
627              failedCount--;
628            }
629          }
630        }
631
632        long elapsed = EnvironmentEdgeManager.currentTime() - start;
633        // Update latency counters
634        averageLatency.add(elapsed);
635        if (elapsed > maxLatency.get()) {
636          maxLatency.set(elapsed);
637        }
638
639        // Log some basic info
640        if (LOG.isDebugEnabled()) {
641          LOG.debug("Processed " + currentProcessingCount + " put requests for "
642              + addr.getHostnamePort() + " and " + failedCount + " failed"
643              + ", latency for this send: " + elapsed);
644        }
645
646        // Reset the current processing put count
647        currentProcessingCount.set(0);
648      } catch (RuntimeException e) {
649        // To make findbugs happy
650        // Log all the exceptions and move on
651        LOG.debug(
652          "Caught some exceptions " + e + " when flushing puts to region server "
653              + addr.getHostnamePort(), e);
654      } catch (Exception e) {
655        if (e instanceof InterruptedException) {
656          Thread.currentThread().interrupt();
657        }
658        // Log all the exceptions and move on
659        LOG.debug(
660          "Caught some exceptions " + e + " when flushing puts to region server "
661              + addr.getHostnamePort(), e);
662      } finally {
663        // Update the totalFailedCount
664        this.totalFailedPutCount.addAndGet(failedCount);
665      }
666    }
667  }
668}