001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020package org.apache.hadoop.hbase.client; 021 022import java.io.IOException; 023import java.util.AbstractMap.SimpleEntry; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.LinkedBlockingQueue; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicLong; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. Each put 052 * will be sharded into different buffer queues based on its destination region server. So each 053 * region server buffer queue will only have the puts which share the same destination. And each 054 * queue will have a flush worker thread to flush the puts request to the region server. If any 055 * queue is full, the HTableMultiplexer starts to drop the Put requests for that particular queue. 056 * </p> 057 * Also all the puts will be retried as a configuration number before dropping. And the 058 * HTableMultiplexer can report the number of buffered requests and the number of the failed 059 * (dropped) requests in total or on per region server basis. 060 * <p/> 061 * This class is thread safe. 062 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 063 * {@link BufferedMutator} for batching mutations. 064 */ 065@Deprecated 066@InterfaceAudience.Public 067public class HTableMultiplexer { 068 private static final Logger LOG = LoggerFactory.getLogger(HTableMultiplexer.class.getName()); 069 070 public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS = 071 "hbase.tablemultiplexer.flush.period.ms"; 072 public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads"; 073 public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE = 074 "hbase.client.max.retries.in.queue"; 075 076 /** The map between each region server to its flush worker */ 077 private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap = 078 new ConcurrentHashMap<>(); 079 080 private final Configuration workerConf; 081 private final ClusterConnection conn; 082 private final ExecutorService pool; 083 private final int maxAttempts; 084 private final int perRegionServerBufferQueueSize; 085 private final int maxKeyValueSize; 086 private final ScheduledExecutorService executor; 087 private final long flushPeriod; 088 089 /** 090 * @param conf The HBaseConfiguration 091 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 092 * each region server before dropping the request. 093 */ 094 public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) 095 throws IOException { 096 this(ConnectionFactory.createConnection(conf), conf, perRegionServerBufferQueueSize); 097 } 098 099 /** 100 * @param conn The HBase connection. 101 * @param conf The HBase configuration 102 * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for 103 * each region server before dropping the request. 104 */ 105 public HTableMultiplexer(Connection conn, Configuration conf, 106 int perRegionServerBufferQueueSize) { 107 this.conn = (ClusterConnection) conn; 108 this.pool = HTable.getDefaultExecutor(conf); 109 // how many times we could try in total, one more than retry number 110 this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 111 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; 112 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; 113 this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); 114 this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); 115 int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10); 116 this.executor = 117 Executors.newScheduledThreadPool(initThreads, 118 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); 119 120 this.workerConf = HBaseConfiguration.create(conf); 121 // We do not do the retry because we need to reassign puts to different queues if regions are 122 // moved. 123 this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 124 } 125 126 /** 127 * Closes the internal {@link Connection}. Does nothing if the {@link Connection} has already 128 * been closed. 129 * @throws IOException If there is an error closing the connection. 130 */ 131 public synchronized void close() throws IOException { 132 if (!getConnection().isClosed()) { 133 getConnection().close(); 134 } 135 } 136 137 /** 138 * The put request will be buffered by its corresponding buffer queue. Return false if the queue 139 * is already full. 140 * @param tableName 141 * @param put 142 * @return true if the request can be accepted by its corresponding buffer queue. 143 */ 144 public boolean put(TableName tableName, final Put put) { 145 return put(tableName, put, this.maxAttempts); 146 } 147 148 /** 149 * The puts request will be buffered by their corresponding buffer queue. 150 * Return the list of puts which could not be queued. 151 * @param tableName 152 * @param puts 153 * @return the list of puts which could not be queued 154 */ 155 public List<Put> put(TableName tableName, final List<Put> puts) { 156 if (puts == null) 157 return null; 158 159 List <Put> failedPuts = null; 160 boolean result; 161 for (Put put : puts) { 162 result = put(tableName, put, this.maxAttempts); 163 if (result == false) { 164 165 // Create the failed puts list if necessary 166 if (failedPuts == null) { 167 failedPuts = new ArrayList<>(); 168 } 169 // Add the put to the failed puts list 170 failedPuts.add(put); 171 } 172 } 173 return failedPuts; 174 } 175 176 /** 177 * @deprecated Use {@link #put(TableName, List) } instead. 178 */ 179 @Deprecated 180 public List<Put> put(byte[] tableName, final List<Put> puts) { 181 return put(TableName.valueOf(tableName), puts); 182 } 183 184 /** 185 * The put request will be buffered by its corresponding buffer queue. And the put request will be 186 * retried before dropping the request. 187 * Return false if the queue is already full. 188 * @return true if the request can be accepted by its corresponding buffer queue. 189 */ 190 public boolean put(final TableName tableName, final Put put, int maxAttempts) { 191 if (maxAttempts <= 0) { 192 return false; 193 } 194 195 try { 196 ConnectionUtils.validatePut(put, maxKeyValueSize); 197 // Allow mocking to get at the connection, but don't expose the connection to users. 198 ClusterConnection conn = (ClusterConnection) getConnection(); 199 // AsyncProcess in the FlushWorker should take care of refreshing the location cache 200 // as necessary. We shouldn't have to do that here. 201 HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false); 202 if (loc != null) { 203 // Add the put pair into its corresponding queue. 204 LinkedBlockingQueue<PutStatus> queue = getQueue(loc); 205 206 // Generate a MultiPutStatus object and offer it into the queue 207 PutStatus s = new PutStatus(loc.getRegion(), put, maxAttempts); 208 209 return queue.offer(s); 210 } 211 } catch (IOException e) { 212 LOG.debug("Cannot process the put " + put, e); 213 } 214 return false; 215 } 216 217 /** 218 * @deprecated Use {@link #put(TableName, Put) } instead. 219 */ 220 @Deprecated 221 public boolean put(final byte[] tableName, final Put put, int retry) { 222 return put(TableName.valueOf(tableName), put, retry); 223 } 224 225 /** 226 * @deprecated Use {@link #put(TableName, Put)} instead. 227 */ 228 @Deprecated 229 public boolean put(final byte[] tableName, Put put) { 230 return put(TableName.valueOf(tableName), put); 231 } 232 233 /** 234 * @return the current HTableMultiplexerStatus 235 */ 236 public HTableMultiplexerStatus getHTableMultiplexerStatus() { 237 return new HTableMultiplexerStatus(serverToFlushWorkerMap); 238 } 239 240 @InterfaceAudience.Private 241 LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) { 242 FlushWorker worker = serverToFlushWorkerMap.get(addr); 243 if (worker == null) { 244 synchronized (this.serverToFlushWorkerMap) { 245 worker = serverToFlushWorkerMap.get(addr); 246 if (worker == null) { 247 // Create the flush worker 248 worker = new FlushWorker(workerConf, this.conn, addr, this, 249 perRegionServerBufferQueueSize, pool, executor); 250 this.serverToFlushWorkerMap.put(addr, worker); 251 executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); 252 } 253 } 254 } 255 return worker.getQueue(); 256 } 257 258 @InterfaceAudience.Private 259 ClusterConnection getConnection() { 260 return this.conn; 261 } 262 263 /** 264 * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. report the 265 * number of buffered requests and the number of the failed (dropped) requests in total or on per 266 * region server basis. 267 * @deprecated since 2.2.0, will be removed in 3.0.0, without replacement. Please use 268 * {@link BufferedMutator} for batching mutations. 269 */ 270 @Deprecated 271 @InterfaceAudience.Public 272 public static class HTableMultiplexerStatus { 273 private long totalFailedPutCounter; 274 private long totalBufferedPutCounter; 275 private long maxLatency; 276 private long overallAverageLatency; 277 private Map<String, Long> serverToFailedCounterMap; 278 private Map<String, Long> serverToBufferedCounterMap; 279 private Map<String, Long> serverToAverageLatencyMap; 280 private Map<String, Long> serverToMaxLatencyMap; 281 282 public HTableMultiplexerStatus( 283 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 284 this.totalBufferedPutCounter = 0; 285 this.totalFailedPutCounter = 0; 286 this.maxLatency = 0; 287 this.overallAverageLatency = 0; 288 this.serverToBufferedCounterMap = new HashMap<>(); 289 this.serverToFailedCounterMap = new HashMap<>(); 290 this.serverToAverageLatencyMap = new HashMap<>(); 291 this.serverToMaxLatencyMap = new HashMap<>(); 292 this.initialize(serverToFlushWorkerMap); 293 } 294 295 private void initialize( 296 Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) { 297 if (serverToFlushWorkerMap == null) { 298 return; 299 } 300 301 long averageCalcSum = 0; 302 int averageCalcCount = 0; 303 for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap 304 .entrySet()) { 305 HRegionLocation addr = entry.getKey(); 306 FlushWorker worker = entry.getValue(); 307 308 long bufferedCounter = worker.getTotalBufferedCount(); 309 long failedCounter = worker.getTotalFailedCount(); 310 long serverMaxLatency = worker.getMaxLatency(); 311 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); 312 // Get sum and count pieces separately to compute overall average 313 SimpleEntry<Long, Integer> averageComponents = averageCounter 314 .getComponents(); 315 long serverAvgLatency = averageCounter.getAndReset(); 316 317 this.totalBufferedPutCounter += bufferedCounter; 318 this.totalFailedPutCounter += failedCounter; 319 if (serverMaxLatency > this.maxLatency) { 320 this.maxLatency = serverMaxLatency; 321 } 322 averageCalcSum += averageComponents.getKey(); 323 averageCalcCount += averageComponents.getValue(); 324 325 this.serverToBufferedCounterMap.put(addr.getHostnamePort(), 326 bufferedCounter); 327 this.serverToFailedCounterMap 328 .put(addr.getHostnamePort(), 329 failedCounter); 330 this.serverToAverageLatencyMap.put(addr.getHostnamePort(), 331 serverAvgLatency); 332 this.serverToMaxLatencyMap 333 .put(addr.getHostnamePort(), 334 serverMaxLatency); 335 } 336 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum 337 / averageCalcCount : 0; 338 } 339 340 public long getTotalBufferedCounter() { 341 return this.totalBufferedPutCounter; 342 } 343 344 public long getTotalFailedCounter() { 345 return this.totalFailedPutCounter; 346 } 347 348 public long getMaxLatency() { 349 return this.maxLatency; 350 } 351 352 public long getOverallAverageLatency() { 353 return this.overallAverageLatency; 354 } 355 356 public Map<String, Long> getBufferedCounterForEachRegionServer() { 357 return this.serverToBufferedCounterMap; 358 } 359 360 public Map<String, Long> getFailedCounterForEachRegionServer() { 361 return this.serverToFailedCounterMap; 362 } 363 364 public Map<String, Long> getMaxLatencyForEachRegionServer() { 365 return this.serverToMaxLatencyMap; 366 } 367 368 public Map<String, Long> getAverageLatencyForEachRegionServer() { 369 return this.serverToAverageLatencyMap; 370 } 371 } 372 373 @InterfaceAudience.Private 374 static class PutStatus { 375 final RegionInfo regionInfo; 376 final Put put; 377 final int maxAttempCount; 378 379 public PutStatus(RegionInfo regionInfo, Put put, int maxAttempCount) { 380 this.regionInfo = regionInfo; 381 this.put = put; 382 this.maxAttempCount = maxAttempCount; 383 } 384 } 385 386 /** 387 * Helper to count the average over an interval until reset. 388 */ 389 private static class AtomicAverageCounter { 390 private long sum; 391 private int count; 392 393 public AtomicAverageCounter() { 394 this.sum = 0L; 395 this.count = 0; 396 } 397 398 public synchronized long getAndReset() { 399 long result = this.get(); 400 this.reset(); 401 return result; 402 } 403 404 public synchronized long get() { 405 if (this.count == 0) { 406 return 0; 407 } 408 return this.sum / this.count; 409 } 410 411 public synchronized SimpleEntry<Long, Integer> getComponents() { 412 return new SimpleEntry<>(sum, count); 413 } 414 415 public synchronized void reset() { 416 this.sum = 0L; 417 this.count = 0; 418 } 419 420 public synchronized void add(long value) { 421 this.sum += value; 422 this.count++; 423 } 424 } 425 426 @InterfaceAudience.Private 427 static class FlushWorker implements Runnable { 428 private final HRegionLocation addr; 429 private final LinkedBlockingQueue<PutStatus> queue; 430 private final HTableMultiplexer multiplexer; 431 private final AtomicLong totalFailedPutCount = new AtomicLong(0); 432 private final AtomicInteger currentProcessingCount = new AtomicInteger(0); 433 private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); 434 private final AtomicLong maxLatency = new AtomicLong(0); 435 436 private final AsyncProcess ap; 437 private final List<PutStatus> processingList = new ArrayList<>(); 438 private final ScheduledExecutorService executor; 439 private final int maxRetryInQueue; 440 private final AtomicInteger retryInQueue = new AtomicInteger(0); 441 private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor 442 private final int operationTimeout; 443 private final ExecutorService pool; 444 public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, 445 HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, 446 ExecutorService pool, ScheduledExecutorService executor) { 447 this.addr = addr; 448 this.multiplexer = htableMultiplexer; 449 this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); 450 RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); 451 RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); 452 this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 453 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 454 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 455 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 456 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 457 this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); 458 this.executor = executor; 459 this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); 460 this.pool = pool; 461 } 462 463 protected LinkedBlockingQueue<PutStatus> getQueue() { 464 return this.queue; 465 } 466 467 public long getTotalFailedCount() { 468 return totalFailedPutCount.get(); 469 } 470 471 public long getTotalBufferedCount() { 472 return (long) queue.size() + currentProcessingCount.get(); 473 } 474 475 public AtomicAverageCounter getAverageLatencyCounter() { 476 return this.averageLatency; 477 } 478 479 public long getMaxLatency() { 480 return this.maxLatency.getAndSet(0); 481 } 482 483 boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { 484 // Decrease the retry count 485 final int retryCount = ps.maxAttempCount - 1; 486 487 if (retryCount <= 0) { 488 // Update the failed counter and no retry any more. 489 return false; 490 } 491 492 int cnt = getRetryInQueue().incrementAndGet(); 493 if (cnt > getMaxRetryInQueue()) { 494 // Too many Puts in queue for resubmit, give up this 495 getRetryInQueue().decrementAndGet(); 496 return false; 497 } 498 499 final Put failedPut = ps.put; 500 // The currentPut is failed. So get the table name for the currentPut. 501 final TableName tableName = ps.regionInfo.getTable(); 502 503 long delayMs = getNextDelay(retryCount); 504 if (LOG.isDebugEnabled()) { 505 LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); 506 } 507 508 // HBASE-12198, HBASE-15221, HBASE-15232: AsyncProcess should be responsible for updating 509 // the region location cache when the Put original failed with some exception. If we keep 510 // re-trying the same Put to the same location, AsyncProcess isn't doing the right stuff 511 // that we expect it to. 512 getExecutor().schedule(new Runnable() { 513 @Override 514 public void run() { 515 boolean succ = false; 516 try { 517 succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount); 518 } finally { 519 FlushWorker.this.getRetryInQueue().decrementAndGet(); 520 if (!succ) { 521 FlushWorker.this.getTotalFailedPutCount().incrementAndGet(); 522 } 523 } 524 } 525 }, delayMs, TimeUnit.MILLISECONDS); 526 return true; 527 } 528 529 @InterfaceAudience.Private 530 long getNextDelay(int retryCount) { 531 return ConnectionUtils.getPauseTime(multiplexer.flushPeriod, 532 multiplexer.maxAttempts - retryCount - 1); 533 } 534 535 @InterfaceAudience.Private 536 AtomicInteger getRetryInQueue() { 537 return this.retryInQueue; 538 } 539 540 @InterfaceAudience.Private 541 int getMaxRetryInQueue() { 542 return this.maxRetryInQueue; 543 } 544 545 @InterfaceAudience.Private 546 AtomicLong getTotalFailedPutCount() { 547 return this.totalFailedPutCount; 548 } 549 550 @InterfaceAudience.Private 551 HTableMultiplexer getMultiplexer() { 552 return this.multiplexer; 553 } 554 555 @InterfaceAudience.Private 556 ScheduledExecutorService getExecutor() { 557 return this.executor; 558 } 559 560 @Override 561 public void run() { 562 int failedCount = 0; 563 try { 564 long start = EnvironmentEdgeManager.currentTime(); 565 566 // drain all the queued puts into the tmp list 567 processingList.clear(); 568 queue.drainTo(processingList); 569 if (processingList.isEmpty()) { 570 // Nothing to flush 571 return; 572 } 573 574 currentProcessingCount.set(processingList.size()); 575 // failedCount is decreased whenever a Put is success or resubmit. 576 failedCount = processingList.size(); 577 578 List<Action> retainedActions = new ArrayList<>(processingList.size()); 579 MultiAction actions = new MultiAction(); 580 for (int i = 0; i < processingList.size(); i++) { 581 PutStatus putStatus = processingList.get(i); 582 Action action = new Action(putStatus.put, i); 583 actions.add(putStatus.regionInfo.getRegionName(), action); 584 retainedActions.add(action); 585 } 586 587 // Process this multi-put request 588 List<PutStatus> failed = null; 589 Object[] results = new Object[actions.size()]; 590 ServerName server = addr.getServerName(); 591 Map<ServerName, MultiAction> actionsByServer = 592 Collections.singletonMap(server, actions); 593 try { 594 AsyncProcessTask task = AsyncProcessTask.newBuilder() 595 .setResults(results) 596 .setPool(pool) 597 .setRpcTimeout(writeRpcTimeout) 598 .setOperationTimeout(operationTimeout) 599 .build(); 600 AsyncRequestFuture arf = 601 ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer); 602 arf.waitUntilDone(); 603 if (arf.hasError()) { 604 // We just log and ignore the exception here since failed Puts will be resubmit again. 605 LOG.debug("Caught some exceptions when flushing puts to region server " 606 + addr.getHostnamePort(), arf.getErrors()); 607 } 608 } finally { 609 for (int i = 0; i < results.length; i++) { 610 if (results[i] instanceof Result) { 611 failedCount--; 612 } else { 613 if (failed == null) { 614 failed = new ArrayList<>(); 615 } 616 failed.add(processingList.get(i)); 617 } 618 } 619 } 620 621 if (failed != null) { 622 // Resubmit failed puts 623 for (PutStatus putStatus : failed) { 624 if (resubmitFailedPut(putStatus, this.addr)) { 625 failedCount--; 626 } 627 } 628 } 629 630 long elapsed = EnvironmentEdgeManager.currentTime() - start; 631 // Update latency counters 632 averageLatency.add(elapsed); 633 if (elapsed > maxLatency.get()) { 634 maxLatency.set(elapsed); 635 } 636 637 // Log some basic info 638 if (LOG.isDebugEnabled()) { 639 LOG.debug("Processed " + currentProcessingCount + " put requests for " 640 + addr.getHostnamePort() + " and " + failedCount + " failed" 641 + ", latency for this send: " + elapsed); 642 } 643 644 // Reset the current processing put count 645 currentProcessingCount.set(0); 646 } catch (RuntimeException e) { 647 // To make findbugs happy 648 // Log all the exceptions and move on 649 LOG.debug( 650 "Caught some exceptions " + e + " when flushing puts to region server " 651 + addr.getHostnamePort(), e); 652 } catch (Exception e) { 653 if (e instanceof InterruptedException) { 654 Thread.currentThread().interrupt(); 655 } 656 // Log all the exceptions and move on 657 LOG.debug( 658 "Caught some exceptions " + e + " when flushing puts to region server " 659 + addr.getHostnamePort(), e); 660 } finally { 661 // Update the totalFailedCount 662 this.totalFailedPutCount.addAndGet(failedCount); 663 } 664 } 665 } 666}