001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.util.AbstractMap.SimpleEntry; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.LinkedBlockingQueue; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051 052/** 053 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put 054 * will be sharded into different buffer queues based on its destination region server. So each 055 * region server buffer queue will only have the puts which share the same destination. And each 056 * queue will have a flush worker thread to flush the puts request to the region server. If any 057 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue. 058 * </p> 059 * Also all the puts will be retried as a configuration number before dropping. And the 060 * HTableMultiplexer can report the number of buffered requests and the number of the failed 061 * (dropped) requests in total or on per region server basis. 062 * <p/> 063 * This class is thread safe. 064 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 065 * {@link BufferedMutator} for batching mutations. 066 */ 067@Deprecated 068@InterfaceAudience.Public 069public class HTableMultiplexer { 070 private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); 071 072 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = 073 "hbase.tablemultiplexer.flush.period.ms"; 074 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; 075 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = 076 "hbase.client.max.retries.in.queue"; 077 078 /** The map between each region server to its flush worker */ 079 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = 080 new ConcurrentHashMap<>(); 081 082 private final Configuration workerConf; 083 private final ClusterConnection conn; 084 private final ExecutorService pool; 085 private final int maxAttempts; 086 private final int perRegionServerBufferQueueSize; 087 private final int maxKeyValueSize; 088 private final ScheduledExecutorService executor; 089 private final long flushPeriod; 090 091 /** 092 * @param conf The HBaseConfiguration 093 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 094 * each region server before dropping the request. 095 */ 096 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) 097 throws IOException { 098 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); 099 } 100 101 /** 102 * @param conn The HBase connection. 103 * @param conf The HBase configuration 104 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 105 * each region server before dropping the request. 106 */ 107 public HTableMultiplexer(Connection conn, Configuration conf, 108 int perRegionServerBufferQueueSize) { 109 this.conn = (ClusterConnection) conn; 110 this.pool = HTable.getDefaultExecutor(conf); 111 // how many times we could try in total, one more than retry number 112 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 113 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 114 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; 115 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); 116 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); 117 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); 118 this.executor = 119 Executors.newScheduledThreadPool(initThreads, 120 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); 121 122 this.workerConf = HBaseConfiguration.create(conf); 123 // We do not do the retry because we need to reassign puts to different queues if regions are 124 // moved. 125 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 126 } 127 128 /** 129 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already 130 * been closed. 131 * @throws IOException If there is an error closing the connection. 132 */ 133 public synchronized void close() throws IOException { 134 if (!getConnection().isClosed()) { 135 getConnection().close(); 136 } 137 } 138 139 /** 140 * The put request will be buffered by its corresponding buffer queue. Return false if the queue 141 * is already full. 142 * @param tableName 143 * @param put 144 * @return true if the request can be accepted by its corresponding buffer queue. 145 */ 146 public boolean put(TableName tableName, final Put put) { 147 return put(tableName, put, this.maxAttempts); 148 } 149 150 /** 151 * The puts request will be buffered by their corresponding buffer queue. 152 * Return the list of puts which could not be queued. 153 * @param tableName 154 * @param puts 155 * @return the list of puts which could not be queued 156 */ 157 public List<Put> put(TableName tableName, final List<Put> puts) { 158 if (puts == null) 159 return null; 160 161 List <Put> failedPuts = null; 162 boolean result; 163 for (Put put : puts) { 164 result = put(tableName, put, this.maxAttempts); 165 if (result == false) { 166 167 // Create the failed puts list if necessary 168 if (failedPuts == null) { 169 failedPuts = new ArrayList<>(); 170 } 171 // Add the put to the failed puts list 172 failedPuts.add(put); 173 } 174 } 175 return failedPuts; 176 } 177 178 /** 179 * @deprecated Use {@link #put(TableName, List) } instead. 180 */ 181 @Deprecated 182 public List<Put> put(byte[] tableName, final List<Put> puts) { 183 return put(TableName.valueOf(tableName), puts); 184 } 185 186 /** 187 * The put request will be buffered by its corresponding buffer queue. And the put request will be 188 * retried before dropping the request. 189 * Return false if the queue is already full. 190 * @return true if the request can be accepted by its corresponding buffer queue. 191 */ 192 public boolean put(final TableName tableName, final Put put, int maxAttempts) { 193 if (maxAttempts <= 0) { 194 return false; 195 } 196 197 try { 198 ConnectionUtils.validatePut(put, maxKeyValueSize); 199 // Allow mocking to get at the connection, but don't expose the connection to users. 200 ClusterConnection conn = (ClusterConnection) getConnection(); 201 // AsyncProcess in the FlushWorker should take care of refreshing the location cache 202 // as necessary. We shouldn't have to do that here. 203 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); 204 if (loc != null) { 205 // Add the put pair into its corresponding queue. 206 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); 207 208 // Generate a MultiPutStatus object and offer it into the queue 209 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); 210 211 return queue.offer(s); 212 } 213 } catch (IOException e) { 214 LOG.debug("Cannot process the put " + put, e); 215 } 216 return false; 217 } 218 219 /** 220 * @deprecated Use {@link #put(TableName, Put) } instead. 221 */ 222 @Deprecated 223 public boolean put(final byte[] tableName, final Put put, int retry) { 224 return put(TableName.valueOf(tableName), put, retry); 225 } 226 227 /** 228 * @deprecated Use {@link #put(TableName, Put)} instead. 229 */ 230 @Deprecated 231 public boolean put(final byte[] tableName, Put put) { 232 return put(TableName.valueOf(tableName), put); 233 } 234 235 /** 236 * @return the current HTableMultiplexerStatus 237 */ 238 public HTableMultiplexerStatus getHTableMultiplexerStatus() { 239 return new HTableMultiplexerStatus(serverToFlushWorkerMap); 240 } 241 242 @VisibleForTesting 243 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { 244 FlushWorker worker = serverToFlushWorkerMap.get(addr); 245 if (worker == null) { 246 synchronized (this.serverToFlushWorkerMap) { 247 worker = serverToFlushWorkerMap.get(addr); 248 if (worker == null) { 249 // Create the flush worker 250 worker = new FlushWorker(workerConf, this.conn, addr, this, 251 perRegionServerBufferQueueSize, pool, executor); 252 this.serverToFlushWorkerMap.put(addr, worker); 253 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); 254 } 255 } 256 } 257 return worker.getQueue(); 258 } 259 260 @VisibleForTesting 261 ClusterConnection getConnection() { 262 return this.conn; 263 } 264 265 /** 266 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the 267 * number of buffered requests and the number of the failed (dropped) requests in total or on per 268 * region server basis. 269 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 270 * {@link BufferedMutator} for batching mutations. 271 */ 272 @Deprecated 273 @InterfaceAudience.Public 274 public static class HTableMultiplexerStatus { 275 private long totalFailedPutCounter; 276 private long totalBufferedPutCounter; 277 private long maxLatency; 278 private long overallAverageLatency; 279 private Map<String, Long> serverToFailedCounterMap; 280 private Map<String, Long> serverToBufferedCounterMap; 281 private Map<String, Long> serverToAverageLatencyMap; 282 private Map<String, Long> serverToMaxLatencyMap; 283 284 public HTableMultiplexerStatus( 285 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 286 this.totalBufferedPutCounter = 0; 287 this.totalFailedPutCounter = 0; 288 this.maxLatency = 0; 289 this.overallAverageLatency = 0; 290 this.serverToBufferedCounterMap = new HashMap<>(); 291 this.serverToFailedCounterMap = new HashMap<>(); 292 this.serverToAverageLatencyMap = new HashMap<>(); 293 this.serverToMaxLatencyMap = new HashMap<>(); 294 this.initialize(serverToFlushWorkerMap); 295 } 296 297 private void initialize( 298 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 299 if (serverToFlushWorkerMap == null) { 300 return; 301 } 302 303 long averageCalcSum = 0; 304 int averageCalcCount = 0; 305 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap 306 .entrySet()) { 307 HRegionLocation addr = entry.getKey(); 308 FlushWorker worker = entry.getValue(); 309 310 long bufferedCounter = worker.getTotalBufferedCount(); 311 long failedCounter = worker.getTotalFailedCount(); 312 long serverMaxLatency = worker.getMaxLatency(); 313 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); 314 // Get sum and count pieces separately to compute overall average 315 SimpleEntry<Long, Integer> averageComponents = averageCounter 316 .getComponents(); 317 long serverAvgLatency = averageCounter.getAndReset(); 318 319 this.totalBufferedPutCounter += bufferedCounter; 320 this.totalFailedPutCounter += failedCounter; 321 if (serverMaxLatency > this.maxLatency) { 322 this.maxLatency = serverMaxLatency; 323 } 324 averageCalcSum += averageComponents.getKey(); 325 averageCalcCount += averageComponents.getValue(); 326 327 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), 328 bufferedCounter); 329 this.serverToFailedCounterMap 330 .put(addr.getHostnamePort(), 331 failedCounter); 332 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), 333 serverAvgLatency); 334 this.serverToMaxLatencyMap 335 .put(addr.getHostnamePort(), 336 serverMaxLatency); 337 } 338 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum 339 / averageCalcCount : 0; 340 } 341 342 public long getTotalBufferedCounter() { 343 return this.totalBufferedPutCounter; 344 } 345 346 public long getTotalFailedCounter() { 347 return this.totalFailedPutCounter; 348 } 349 350 public long getMaxLatency() { 351 return this.maxLatency; 352 } 353 354 public long getOverallAverageLatency() { 355 return this.overallAverageLatency; 356 } 357 358 public Map<String, Long> getBufferedCounterForEachRegionServer() { 359 return this.serverToBufferedCounterMap; 360 } 361 362 public Map<String, Long> getFailedCounterForEachRegionServer() { 363 return this.serverToFailedCounterMap; 364 } 365 366 public Map<String, Long> getMaxLatencyForEachRegionServer() { 367 return this.serverToMaxLatencyMap; 368 } 369 370 public Map<String, Long> getAverageLatencyForEachRegionServer() { 371 return this.serverToAverageLatencyMap; 372 } 373 } 374 375 @VisibleForTesting 376 static class PutStatus { 377 final RegionInfo regionInfo; 378 final Put put; 379 final int maxAttempCount; 380 381 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { 382 this.regionInfo = regionInfo; 383 this.put = put; 384 this.maxAttempCount = maxAttempCount; 385 } 386 } 387 388 /** 389 * Helper to count the average over an interval until reset. 390 */ 391 private static class AtomicAverageCounter { 392 private long sum; 393 private int count; 394 395 public AtomicAverageCounter() { 396 this.sum = 0L; 397 this.count = 0; 398 } 399 400 public synchronized long getAndReset() { 401 long result = this.get(); 402 this.reset(); 403 return result; 404 } 405 406 public synchronized long get() { 407 if (this.count == 0) { 408 return 0; 409 } 410 return this.sum / this.count; 411 } 412 413 public synchronized SimpleEntry<Long, Integer> getComponents() { 414 return new SimpleEntry<>(sum, count); 415 } 416 417 public synchronized void reset() { 418 this.sum = 0L; 419 this.count = 0; 420 } 421 422 public synchronized void add(long value) { 423 this.sum += value; 424 this.count++; 425 } 426 } 427 428 @VisibleForTesting 429 static class FlushWorker implements Runnable { 430 private final HRegionLocation addr; 431 private final LinkedBlockingQueue<PutStatus> queue; 432 private final HTableMultiplexer multiplexer; 433 private final AtomicLong totalFailedPutCount = new AtomicLong(0); 434 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); 435 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); 436 private final AtomicLong maxLatency = new AtomicLong(0); 437 438 private final AsyncProcess ap; 439 private final List<PutStatus> processingList = new ArrayList<>(); 440 private final ScheduledExecutorService executor; 441 private final int maxRetryInQueue; 442 private final AtomicInteger retryInQueue = new AtomicInteger(0); 443 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor 444 private final int operationTimeout; 445 private final ExecutorService pool; 446 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, 447 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, 448 ExecutorService pool, ScheduledExecutorService executor) { 449 this.addr = addr; 450 this.multiplexer = htableMultiplexer; 451 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); 452 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); 453 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); 454 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 455 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 456 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 457 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 458 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 459 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); 460 this.executor = executor; 461 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); 462 this.pool = pool; 463 } 464 465 protected LinkedBlockingQueue<PutStatus> getQueue() { 466 return this.queue; 467 } 468 469 public long getTotalFailedCount() { 470 return totalFailedPutCount.get(); 471 } 472 473 public long getTotalBufferedCount() { 474 return (long) queue.size() + currentProcessingCount.get(); 475 } 476 477 public AtomicAverageCounter getAverageLatencyCounter() { 478 return this.averageLatency; 479 } 480 481 public long getMaxLatency() { 482 return this.maxLatency.getAndSet(0); 483 } 484 485 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { 486 // Decrease the retry count 487 final int retryCount = ps.maxAttempCount - 1; 488 489 if (retryCount <= 0) { 490 // Update the failed counter and no retry any more. 491 return false; 492 } 493 494 int cnt = getRetryInQueue().incrementAndGet(); 495 if (cnt > getMaxRetryInQueue()) { 496 // Too many Puts in queue for resubmit, give up this 497 getRetryInQueue().decrementAndGet(); 498 return false; 499 } 500 501 final Put failedPut = ps.put; 502 // The currentPut is failed. So get the table name for the currentPut. 503 final TableName tableName = ps.regionInfo.getTable(); 504 505 long delayMs = getNextDelay(retryCount); 506 if (LOG.isDebugEnabled()) { 507 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); 508 } 509 510 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating 511 // the region location cache when the Put original failed with some exception. If we keep 512 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff 513 // that we expect it to. 514 getExecutor().schedule(new Runnable() { 515 @Override 516 public void run() { 517 boolean succ = false; 518 try { 519 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); 520 } finally { 521 FlushWorker.this.getRetryInQueue().decrementAndGet(); 522 if (!succ) { 523 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); 524 } 525 } 526 } 527 }, delayMs, TimeUnit.MILLISECONDS); 528 return true; 529 } 530 531 @VisibleForTesting 532 long getNextDelay(int retryCount) { 533 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, 534 multiplexer.maxAttempts - retryCount - 1); 535 } 536 537 @VisibleForTesting 538 AtomicInteger getRetryInQueue() { 539 return this.retryInQueue; 540 } 541 542 @VisibleForTesting 543 int getMaxRetryInQueue() { 544 return this.maxRetryInQueue; 545 } 546 547 @VisibleForTesting 548 AtomicLong getTotalFailedPutCount() { 549 return this.totalFailedPutCount; 550 } 551 552 @VisibleForTesting 553 HTableMultiplexer getMultiplexer() { 554 return this.multiplexer; 555 } 556 557 @VisibleForTesting 558 ScheduledExecutorService getExecutor() { 559 return this.executor; 560 } 561 562 @Override 563 public void run() { 564 int failedCount = 0; 565 try { 566 long start = EnvironmentEdgeManager.currentTime(); 567 568 // drain all the queued puts into the tmp list 569 processingList.clear(); 570 queue.drainTo(processingList); 571 if (processingList.isEmpty()) { 572 // Nothing to flush 573 return; 574 } 575 576 currentProcessingCount.set(processingList.size()); 577 // failedCount is decreased whenever a Put is success or resubmit. 578 failedCount = processingList.size(); 579 580 List<Action> retainedActions = new ArrayList<>(processingList.size()); 581 MultiAction actions = new MultiAction(); 582 for (int i = 0; i < processingList.size(); i++) { 583 PutStatus putStatus = processingList.get(i); 584 Action action = new Action(putStatus.put, i); 585 actions.add(putStatus.regionInfo.getRegionName(), action); 586 retainedActions.add(action); 587 } 588 589 // Process this multi-put request 590 List<PutStatus> failed = null; 591 Object[] results = new Object[actions.size()]; 592 ServerName server = addr.getServerName(); 593 Map<ServerName, MultiAction> actionsByServer = 594 Collections.singletonMap(server, actions); 595 try { 596 AsyncProcessTask task = AsyncProcessTask.newBuilder() 597 .setResults(results) 598 .setPool(pool) 599 .setRpcTimeout(writeRpcTimeout) 600 .setOperationTimeout(operationTimeout) 601 .build(); 602 AsyncRequestFuture arf = 603 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); 604 arf.waitUntilDone(); 605 if (arf.hasError()) { 606 // We just log and ignore the exception here since failed Puts will be resubmit again. 607 LOG.debug("Caught some exceptions when flushing puts to region server " 608 + addr.getHostnamePort(), arf.getErrors()); 609 } 610 } finally { 611 for (int i = 0; i < results.length; i++) { 612 if (results[i] instanceof Result) { 613 failedCount--; 614 } else { 615 if (failed == null) { 616 failed = new ArrayList<>(); 617 } 618 failed.add(processingList.get(i)); 619 } 620 } 621 } 622 623 if (failed != null) { 624 // Resubmit failed puts 625 for (PutStatus putStatus : failed) { 626 if (resubmitFailedPut(putStatus, this.addr)) { 627 failedCount--; 628 } 629 } 630 } 631 632 long elapsed = EnvironmentEdgeManager.currentTime() - start; 633 // Update latency counters 634 averageLatency.add(elapsed); 635 if (elapsed > maxLatency.get()) { 636 maxLatency.set(elapsed); 637 } 638 639 // Log some basic info 640 if (LOG.isDebugEnabled()) { 641 LOG.debug("Processed " + currentProcessingCount + " put requests for " 642 + addr.getHostnamePort() + " and " + failedCount + " failed" 643 + ", latency for this send: " + elapsed); 644 } 645 646 // Reset the current processing put count 647 currentProcessingCount.set(0); 648 } catch (RuntimeException e) { 649 // To make findbugs happy 650 // Log all the exceptions and move on 651 LOG.debug( 652 "Caught some exceptions " + e + " when flushing puts to region server " 653 + addr.getHostnamePort(), e); 654 } catch (Exception e) { 655 if (e instanceof InterruptedException) { 656 Thread.currentThread().interrupt(); 657 } 658 // Log all the exceptions and move on 659 LOG.debug( 660 "Caught some exceptions " + e + " when flushing puts to region server " 661 + addr.getHostnamePort(), e); 662 } finally { 663 // Update the totalFailedCount 664 this.totalFailedPutCount.addAndGet(failedCount); 665 } 666 } 667 } 668}