001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.net.SocketTimeoutException; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeMap; 033import java.util.concurrent.Callable; 034import java.util.concurrent.CompletionService; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorCompletionService; 037import java.util.concurrent.Future; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.stream.Collectors; 041import java.util.stream.Stream; 042 043import org.apache.commons.lang3.StringUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.Abortable; 047import org.apache.hadoop.hbase.CellUtil; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.TableNotFoundException; 052import org.apache.hadoop.hbase.client.Admin; 053import org.apache.hadoop.hbase.client.ClusterConnection; 054import org.apache.hadoop.hbase.client.Connection; 055import org.apache.hadoop.hbase.client.ConnectionFactory; 056import org.apache.hadoop.hbase.ipc.RpcServer; 057import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 058import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 059import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 060import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 061import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.Threads; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.hadoop.hbase.wal.WALEdit; 067import org.apache.hadoop.ipc.RemoteException; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 073 074import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 075import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 076import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 077 078/** 079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 080 * implementation for replicating to another HBase cluster. 081 * For the slave cluster it selects a random number of peers 082 * using a replication ratio. For example, if replication ration = 0.1 083 * and slave cluster has 100 region servers, 10 will be selected. 084 * <p> 085 * A stream is considered down when we cannot contact a region server on the 086 * peer cluster for more than 55 seconds by default. 087 * </p> 088 */ 089@InterfaceAudience.Private 090public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { 091 private static final Logger LOG = 092 LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); 093 094 private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; 095 096 /** Drop edits for tables that been deleted from the replication source and target */ 097 public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = 098 "hbase.replication.drop.on.deleted.table"; 099 /** Drop edits for CFs that been deleted from the replication source and target */ 100 public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY = 101 "hbase.replication.drop.on.deleted.columnfamily"; 102 103 private ClusterConnection conn; 104 private Configuration localConf; 105 private Configuration conf; 106 // How long should we sleep for each retry 107 private long sleepForRetries; 108 // Maximum number of retries before taking bold actions 109 private int maxRetriesMultiplier; 110 // Socket timeouts require even bolder actions since we don't want to DDOS 111 private int socketTimeoutMultiplier; 112 // Amount of time for shutdown to wait for all tasks to complete 113 private long maxTerminationWait; 114 // Size limit for replication RPCs, in bytes 115 private int replicationRpcLimit; 116 //Metrics for this source 117 private MetricsSource metrics; 118 // Handles connecting to peer region servers 119 private ReplicationSinkManager replicationSinkMgr; 120 private boolean peersSelected = false; 121 private String replicationClusterId = ""; 122 private ThreadPoolExecutor exec; 123 private int maxThreads; 124 private Path baseNamespaceDir; 125 private Path hfileArchiveDir; 126 private boolean replicationBulkLoadDataEnabled; 127 private Abortable abortable; 128 private boolean dropOnDeletedTables; 129 private boolean dropOnDeletedColumnFamilies; 130 private boolean isSerial = false; 131 132 /* 133 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 134 * different Connection implementations, or initialize it in a different way, 135 * so defining createConnection as protected for possible overridings. 136 */ 137 protected Connection createConnection(Configuration conf) throws IOException { 138 return ConnectionFactory.createConnection(conf); 139 } 140 141 /* 142 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 143 * different ReplicationSinkManager implementations, or initialize it in a different way, 144 * so defining createReplicationSinkManager as protected for possible overridings. 145 */ 146 protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { 147 return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), 148 this, this.conf); 149 } 150 151 @Override 152 public void init(Context context) throws IOException { 153 super.init(context); 154 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 155 this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); 156 decorateConf(); 157 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 158 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", 159 maxRetriesMultiplier); 160 // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator 161 // tasks to terminate when doStop() is called. 162 long maxTerminationWaitMultiplier = this.conf.getLong( 163 "replication.source.maxterminationmultiplier", 164 DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); 165 this.maxTerminationWait = maxTerminationWaitMultiplier * 166 this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 167 // TODO: This connection is replication specific or we should make it particular to 168 // replication and make replication specific settings such as compression or codec to use 169 // passing Cells. 170 Connection connection = createConnection(this.conf); 171 //Since createConnection method may be overridden by extending classes, we need to make sure 172 //it's indeed returning a ClusterConnection instance. 173 Preconditions.checkState(connection instanceof ClusterConnection); 174 this.conn = (ClusterConnection) connection; 175 this.sleepForRetries = 176 this.conf.getLong("replication.source.sleepforretries", 1000); 177 this.metrics = context.getMetrics(); 178 // ReplicationQueueInfo parses the peerId out of the znode for us 179 this.replicationSinkMgr = createReplicationSinkManager(conn); 180 // per sink thread pool 181 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 182 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); 183 this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, 184 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); 185 this.abortable = ctx.getAbortable(); 186 // Set the size limit for replication RPCs to 95% of the max request size. 187 // We could do with less slop if we have an accurate estimate of encoded size. Being 188 // conservative for now. 189 this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, 190 RpcServer.DEFAULT_MAX_REQUEST_SIZE)); 191 this.dropOnDeletedTables = 192 this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 193 this.dropOnDeletedColumnFamilies = this.conf 194 .getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false); 195 196 this.replicationBulkLoadDataEnabled = 197 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 198 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 199 if (this.replicationBulkLoadDataEnabled) { 200 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); 201 } 202 // Construct base namespace directory and hfile archive directory path 203 Path rootDir = CommonFSUtils.getRootDir(conf); 204 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); 205 baseNamespaceDir = new Path(rootDir, baseNSDir); 206 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); 207 isSerial = context.getPeerConfig().isSerial(); 208 } 209 210 private void decorateConf() { 211 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 212 if (StringUtils.isNotEmpty(replicationCodec)) { 213 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 214 } 215 } 216 217 private void connectToPeers() { 218 getRegionServers(); 219 220 int sleepMultiplier = 1; 221 222 // Connect to peer cluster first, unless we have to stop 223 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 224 replicationSinkMgr.chooseSinks(); 225 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 226 if (sleepForRetries("Waiting for peers", sleepMultiplier)) { 227 sleepMultiplier++; 228 } 229 } 230 } 231 } 232 233 /** 234 * Do the sleeping logic 235 * @param msg Why we sleep 236 * @param sleepMultiplier by how many times the default sleeping time is augmented 237 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 238 */ 239 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 240 try { 241 if (LOG.isTraceEnabled()) { 242 LOG.trace("{} {}, sleeping {} times {}", 243 logPeerId(), msg, sleepForRetries, sleepMultiplier); 244 } 245 Thread.sleep(this.sleepForRetries * sleepMultiplier); 246 } catch (InterruptedException e) { 247 if (LOG.isDebugEnabled()) { 248 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 249 } 250 } 251 return sleepMultiplier < maxRetriesMultiplier; 252 } 253 254 private int getEstimatedEntrySize(Entry e) { 255 long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf(); 256 return (int) size; 257 } 258 259 private List<List<Entry>> createParallelBatches(final List<Entry> entries) { 260 int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); 261 int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); 262 List<List<Entry>> entryLists = 263 Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList()); 264 int[] sizes = new int[n]; 265 for (Entry e : entries) { 266 int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n); 267 int entrySize = getEstimatedEntrySize(e); 268 // If this batch has at least one entry and is over sized, move it to the tail of list and 269 // initialize the entryLists[index] to be a empty list. 270 if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) { 271 entryLists.add(entryLists.get(index)); 272 entryLists.set(index, new ArrayList<>()); 273 sizes[index] = 0; 274 } 275 entryLists.get(index).add(e); 276 sizes[index] += entrySize; 277 } 278 return entryLists; 279 } 280 281 private List<List<Entry>> createSerialBatches(final List<Entry> entries) { 282 Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR); 283 for (Entry e : entries) { 284 regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>()) 285 .add(e); 286 } 287 return new ArrayList<>(regionEntries.values()); 288 } 289 290 /** 291 * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool 292 * concurrently. Note that, for serial replication, we need to make sure that entries from the 293 * same region to be replicated serially, so entries from the same region consist of a batch, and 294 * we will divide a batch into several batches by replicationRpcLimit in method 295 * serialReplicateRegionEntries() 296 */ 297 private List<List<Entry>> createBatches(final List<Entry> entries) { 298 if (isSerial) { 299 return createSerialBatches(entries); 300 } else { 301 return createParallelBatches(entries); 302 } 303 } 304 305 /** 306 * Check if there's an {@link TableNotFoundException} in the caused by stacktrace. 307 */ 308 @VisibleForTesting 309 public static boolean isTableNotFoundException(Throwable io) { 310 if (io instanceof RemoteException) { 311 io = ((RemoteException) io).unwrapRemoteException(); 312 } 313 if (io != null && io.getMessage().contains("TableNotFoundException")) { 314 return true; 315 } 316 for (; io != null; io = io.getCause()) { 317 if (io instanceof TableNotFoundException) { 318 return true; 319 } 320 } 321 return false; 322 } 323 324 /** 325 * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace. 326 */ 327 @VisibleForTesting 328 public static boolean isNoSuchColumnFamilyException(Throwable io) { 329 if (io instanceof RemoteException) { 330 io = ((RemoteException) io).unwrapRemoteException(); 331 } 332 if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) { 333 return true; 334 } 335 for (; io != null; io = io.getCause()) { 336 if (io instanceof NoSuchColumnFamilyException) { 337 return true; 338 } 339 } 340 return false; 341 } 342 343 @VisibleForTesting 344 List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) { 345 List<List<Entry>> entryList = new ArrayList<>(); 346 Map<TableName, Boolean> existMap = new HashMap<>(); 347 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 348 Admin localAdmin = localConn.getAdmin()) { 349 for (List<Entry> oldEntries : oldEntryList) { 350 List<Entry> entries = new ArrayList<>(); 351 for (Entry e : oldEntries) { 352 TableName tableName = e.getKey().getTableName(); 353 boolean exist = true; 354 if (existMap.containsKey(tableName)) { 355 exist = existMap.get(tableName); 356 } else { 357 try { 358 exist = localAdmin.tableExists(tableName); 359 existMap.put(tableName, exist); 360 } catch (IOException iox) { 361 LOG.warn("Exception checking for local table " + tableName, iox); 362 // we can't drop edits without full assurance, so we assume table exists. 363 exist = true; 364 } 365 } 366 if (exist) { 367 entries.add(e); 368 } else { 369 // Would potentially be better to retry in one of the outer loops 370 // and add a table filter there; but that would break the encapsulation, 371 // so we're doing the filtering here. 372 LOG.warn("Missing table detected at sink, local table also does not exist, " 373 + "filtering edits for table '{}'", tableName); 374 } 375 } 376 if (!entries.isEmpty()) { 377 entryList.add(entries); 378 } 379 } 380 } catch (IOException iox) { 381 LOG.warn("Exception when creating connection to check local table", iox); 382 return oldEntryList; 383 } 384 return entryList; 385 } 386 387 @VisibleForTesting 388 List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) { 389 List<List<Entry>> entryList = new ArrayList<>(); 390 Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>(); 391 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 392 Admin localAdmin = localConn.getAdmin()) { 393 for (List<Entry> oldEntries : oldEntryList) { 394 List<Entry> entries = new ArrayList<>(); 395 for (Entry e : oldEntries) { 396 TableName tableName = e.getKey().getTableName(); 397 if (!existColumnFamilyMap.containsKey(tableName)) { 398 try { 399 Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream() 400 .map(Bytes::toString).collect(Collectors.toSet()); 401 existColumnFamilyMap.put(tableName, cfs); 402 } catch (Exception ex) { 403 LOG.warn("Exception getting cf names for local table {}", tableName, ex); 404 // if catch any exception, we are not sure about table's description, 405 // so replicate raw entry 406 entries.add(e); 407 continue; 408 } 409 } 410 411 Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName); 412 Set<String> missingCFs = new HashSet<>(); 413 WALEdit walEdit = new WALEdit(); 414 walEdit.getCells().addAll(e.getEdit().getCells()); 415 WALUtil.filterCells(walEdit, cell -> { 416 String cf = Bytes.toString(CellUtil.cloneFamily(cell)); 417 if (existColumnFamilies.contains(cf)) { 418 return cell; 419 } else { 420 missingCFs.add(cf); 421 return null; 422 } 423 }); 424 if (!walEdit.isEmpty()) { 425 Entry newEntry = new Entry(e.getKey(), walEdit); 426 entries.add(newEntry); 427 } 428 429 if (!missingCFs.isEmpty()) { 430 // Would potentially be better to retry in one of the outer loops 431 // and add a table filter there; but that would break the encapsulation, 432 // so we're doing the filtering here. 433 LOG.warn( 434 "Missing column family detected at sink, local column family also does not exist," 435 + " filtering edits for table '{}',column family '{}'", tableName, missingCFs); 436 } 437 } 438 if (!entries.isEmpty()) { 439 entryList.add(entries); 440 } 441 } 442 } catch (IOException iox) { 443 LOG.warn("Exception when creating connection to check local table", iox); 444 return oldEntryList; 445 } 446 return entryList; 447 } 448 449 private void reconnectToPeerCluster() { 450 ClusterConnection connection = null; 451 try { 452 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 453 } catch (IOException ioe) { 454 LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); 455 } 456 if (connection != null) { 457 this.conn = connection; 458 } 459 } 460 461 private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, 462 List<List<Entry>> batches) throws IOException { 463 int futures = 0; 464 for (int i = 0; i < batches.size(); i++) { 465 List<Entry> entries = batches.get(i); 466 if (!entries.isEmpty()) { 467 if (LOG.isTraceEnabled()) { 468 LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), 469 replicateContext.getSize()); 470 } 471 // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource 472 pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); 473 futures++; 474 } 475 } 476 477 IOException iox = null; 478 long lastWriteTime = 0; 479 for (int i = 0; i < futures; i++) { 480 try { 481 // wait for all futures, remove successful parts 482 // (only the remaining parts will be retried) 483 Future<Integer> f = pool.take(); 484 int index = f.get(); 485 List<Entry> batch = batches.get(index); 486 batches.set(index, Collections.emptyList()); // remove successful batch 487 // Find the most recent write time in the batch 488 long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); 489 if (writeTime > lastWriteTime) { 490 lastWriteTime = writeTime; 491 } 492 } catch (InterruptedException ie) { 493 iox = new IOException(ie); 494 } catch (ExecutionException ee) { 495 iox = ee.getCause() instanceof IOException? 496 (IOException)ee.getCause(): new IOException(ee.getCause()); 497 } 498 } 499 if (iox != null) { 500 // if we had any exceptions, try again 501 throw iox; 502 } 503 return lastWriteTime; 504 } 505 506 /** 507 * Do the shipping logic 508 */ 509 @Override 510 public boolean replicate(ReplicateContext replicateContext) { 511 CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); 512 int sleepMultiplier = 1; 513 514 if (!peersSelected && this.isRunning()) { 515 connectToPeers(); 516 peersSelected = true; 517 } 518 519 int numSinks = replicationSinkMgr.getNumSinks(); 520 if (numSinks == 0) { 521 LOG.warn("{} No replication sinks found, returning without replicating. " 522 + "The source should retry with the same set of edits.", logPeerId()); 523 return false; 524 } 525 526 List<List<Entry>> batches = createBatches(replicateContext.getEntries()); 527 while (this.isRunning() && !exec.isShutdown()) { 528 if (!isPeerEnabled()) { 529 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 530 sleepMultiplier++; 531 } 532 continue; 533 } 534 if (this.conn == null || this.conn.isClosed()) { 535 reconnectToPeerCluster(); 536 } 537 try { 538 // replicate the batches to sink side. 539 parallelReplicate(pool, replicateContext, batches); 540 return true; 541 } catch (IOException ioe) { 542 if (ioe instanceof RemoteException) { 543 if (dropOnDeletedTables && isTableNotFoundException(ioe)) { 544 // Only filter the edits to replicate and don't change the entries in replicateContext 545 // as the upper layer rely on it. 546 batches = filterNotExistTableEdits(batches); 547 if (batches.isEmpty()) { 548 LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); 549 return true; 550 } 551 } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { 552 batches = filterNotExistColumnFamilyEdits(batches); 553 if (batches.isEmpty()) { 554 LOG.warn( 555 "After filter not exist column family's edits, 0 edits to replicate, just return"); 556 return true; 557 } 558 } 559 } else { 560 if (ioe instanceof SocketTimeoutException) { 561 // This exception means we waited for more than 60s and nothing 562 // happened, the cluster is alive and calling it right away 563 // even for a test just makes things worse. 564 sleepForRetries("Encountered a SocketTimeoutException. Since the " + 565 "call to the remote cluster timed out, which is usually " + 566 "caused by a machine failure or a massive slowdown", 567 this.socketTimeoutMultiplier); 568 } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { 569 LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); 570 replicationSinkMgr.chooseSinks(); 571 } else { 572 LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); 573 } 574 } 575 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { 576 sleepMultiplier++; 577 } 578 } 579 } 580 return false; // in case we exited before replicating 581 } 582 583 protected boolean isPeerEnabled() { 584 return ctx.getReplicationPeer().isPeerEnabled(); 585 } 586 587 @Override 588 protected void doStop() { 589 disconnect(); // don't call super.doStop() 590 if (this.conn != null) { 591 try { 592 this.conn.close(); 593 this.conn = null; 594 } catch (IOException e) { 595 LOG.warn("{} Failed to close the connection", logPeerId()); 596 } 597 } 598 // Allow currently running replication tasks to finish 599 exec.shutdown(); 600 try { 601 exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); 602 } catch (InterruptedException e) { 603 } 604 // Abort if the tasks did not terminate in time 605 if (!exec.isTerminated()) { 606 String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + 607 "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + 608 "Aborting to prevent Replication from deadlocking. See HBASE-16081."; 609 abortable.abort(errMsg, new IOException(errMsg)); 610 } 611 notifyStopped(); 612 } 613 614 @VisibleForTesting 615 protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) 616 throws IOException { 617 SinkPeer sinkPeer = null; 618 try { 619 int entriesHashCode = System.identityHashCode(entries); 620 if (LOG.isTraceEnabled()) { 621 long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); 622 LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", 623 logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); 624 } 625 sinkPeer = replicationSinkMgr.getReplicationSink(); 626 BlockingInterface rrs = sinkPeer.getRegionServer(); 627 try { 628 ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), 629 replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout); 630 if (LOG.isTraceEnabled()) { 631 LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); 632 } 633 } catch (IOException e) { 634 if (LOG.isTraceEnabled()) { 635 LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); 636 } 637 throw e; 638 } 639 replicationSinkMgr.reportSinkSuccess(sinkPeer); 640 } catch (IOException ioe) { 641 if (sinkPeer != null) { 642 replicationSinkMgr.reportBadSink(sinkPeer); 643 } 644 throw ioe; 645 } 646 return batchIndex; 647 } 648 649 private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) 650 throws IOException { 651 int batchSize = 0, index = 0; 652 List<Entry> batch = new ArrayList<>(); 653 for (Entry entry : entries) { 654 int entrySize = getEstimatedEntrySize(entry); 655 if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { 656 replicateEntries(batch, index++, timeout); 657 batch.clear(); 658 batchSize = 0; 659 } 660 batch.add(entry); 661 batchSize += entrySize; 662 } 663 if (batchSize > 0) { 664 replicateEntries(batch, index, timeout); 665 } 666 return batchIndex; 667 } 668 669 @VisibleForTesting 670 protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { 671 return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) 672 : () -> replicateEntries(entries, batchIndex, timeout); 673 } 674 675 private String logPeerId(){ 676 return "[Source for peer " + this.ctx.getPeerId() + "]:"; 677 } 678 679}