001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.net.ConnectException; 022import java.net.SocketTimeoutException; 023import java.net.UnknownHostException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeMap; 032import java.util.concurrent.Callable; 033import java.util.concurrent.CompletionService; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ExecutorCompletionService; 036import java.util.concurrent.Future; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Abortable; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotFoundException; 050import org.apache.hadoop.hbase.client.Admin; 051import org.apache.hadoop.hbase.client.ClusterConnection; 052import org.apache.hadoop.hbase.client.Connection; 053import org.apache.hadoop.hbase.client.ConnectionFactory; 054import org.apache.hadoop.hbase.ipc.CallTimeoutException; 055import org.apache.hadoop.hbase.ipc.RpcServer; 056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 058import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 060import org.apache.hadoop.hbase.replication.ReplicationUtils; 061import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.hbase.util.Threads; 066import org.apache.hadoop.hbase.wal.WAL.Entry; 067import org.apache.hadoop.hbase.wal.WALEdit; 068import org.apache.hadoop.ipc.RemoteException; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 077 078/** 079 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating 080 * to another HBase cluster. For the slave cluster it selects a random number of peers using a 081 * replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region 082 * servers, 10 will be selected. 083 * <p> 084 * A stream is considered down when we cannot contact a region server on the peer cluster for more 085 * than 55 seconds by default. 086 * </p> 087 */ 088@InterfaceAudience.Private 089public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { 090 private static final Logger LOG = 091 LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); 092 093 private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; 094 095 /** Drop edits for tables that been deleted from the replication source and target */ 096 public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = 097 "hbase.replication.drop.on.deleted.table"; 098 /** Drop edits for CFs that been deleted from the replication source and target */ 099 public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY = 100 "hbase.replication.drop.on.deleted.columnfamily"; 101 102 private ClusterConnection conn; 103 private Configuration localConf; 104 private Configuration conf; 105 // How long should we sleep for each retry 106 private long sleepForRetries; 107 // Maximum number of retries before taking bold actions 108 private int maxRetriesMultiplier; 109 // Socket timeouts require even bolder actions since we don't want to DDOS 110 private int socketTimeoutMultiplier; 111 // Amount of time for shutdown to wait for all tasks to complete 112 private long maxTerminationWait; 113 // Size limit for replication RPCs, in bytes 114 private int replicationRpcLimit; 115 // Metrics for this source 116 private MetricsSource metrics; 117 // Handles connecting to peer region servers 118 private ReplicationSinkManager replicationSinkMgr; 119 private boolean peersSelected = false; 120 private String replicationClusterId = ""; 121 private ThreadPoolExecutor exec; 122 private int maxThreads; 123 private Path baseNamespaceDir; 124 private Path hfileArchiveDir; 125 private boolean replicationBulkLoadDataEnabled; 126 private Abortable abortable; 127 private boolean dropOnDeletedTables; 128 private boolean dropOnDeletedColumnFamilies; 129 private boolean isSerial = false; 130 // Initialising as 0 to guarantee at least one logging message 131 private long lastSinkFetchTime = 0; 132 133 /* 134 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 135 * different Connection implementations, or initialize it in a different way, so defining 136 * createConnection as protected for possible overridings. 137 */ 138 protected Connection createConnection(Configuration conf) throws IOException { 139 return ConnectionFactory.createConnection(conf); 140 } 141 142 /* 143 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 144 * different ReplicationSinkManager implementations, or initialize it in a different way, so 145 * defining createReplicationSinkManager as protected for possible overridings. 146 */ 147 protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { 148 return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), this, 149 this.conf); 150 } 151 152 @Override 153 public void init(Context context) throws IOException { 154 super.init(context); 155 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 156 this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); 157 decorateConf(); 158 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 159 this.socketTimeoutMultiplier = 160 this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); 161 // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator 162 // tasks to terminate when doStop() is called. 163 long maxTerminationWaitMultiplier = this.conf.getLong( 164 "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); 165 this.maxTerminationWait = maxTerminationWaitMultiplier 166 * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 167 // TODO: This connection is replication specific or we should make it particular to 168 // replication and make replication specific settings such as compression or codec to use 169 // passing Cells. 170 Connection connection = createConnection(this.conf); 171 // Since createConnection method may be overridden by extending classes, we need to make sure 172 // it's indeed returning a ClusterConnection instance. 173 Preconditions.checkState(connection instanceof ClusterConnection); 174 this.conn = (ClusterConnection) connection; 175 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 176 this.metrics = context.getMetrics(); 177 // ReplicationQueueInfo parses the peerId out of the znode for us 178 this.replicationSinkMgr = createReplicationSinkManager(conn); 179 // per sink thread pool 180 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 181 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); 182 this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, 183 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); 184 this.abortable = ctx.getAbortable(); 185 // Set the size limit for replication RPCs to 95% of the max request size. 186 // We could do with less slop if we have an accurate estimate of encoded size. Being 187 // conservative for now. 188 this.replicationRpcLimit = 189 (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); 190 this.dropOnDeletedTables = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 191 this.dropOnDeletedColumnFamilies = 192 this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false); 193 194 this.replicationBulkLoadDataEnabled = conf.getBoolean( 195 HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 196 if (this.replicationBulkLoadDataEnabled) { 197 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); 198 } 199 // Construct base namespace directory and hfile archive directory path 200 Path rootDir = CommonFSUtils.getRootDir(conf); 201 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); 202 baseNamespaceDir = new Path(rootDir, baseNSDir); 203 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); 204 isSerial = context.getPeerConfig().isSerial(); 205 } 206 207 private void decorateConf() { 208 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 209 if (StringUtils.isNotEmpty(replicationCodec)) { 210 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 211 } 212 } 213 214 private void connectToPeers() { 215 getRegionServers(); 216 217 int sleepMultiplier = 1; 218 219 // Connect to peer cluster first, unless we have to stop 220 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 221 replicationSinkMgr.chooseSinks(); 222 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 223 if (sleepForRetries("Waiting for peers", sleepMultiplier)) { 224 sleepMultiplier++; 225 } 226 } 227 } 228 } 229 230 /** 231 * Do the sleeping logic 232 * @param msg Why we sleep 233 * @param sleepMultiplier by how many times the default sleeping time is augmented 234 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 235 */ 236 private boolean sleepForRetries(String msg, int sleepMultiplier) { 237 try { 238 if (LOG.isTraceEnabled()) { 239 LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, 240 sleepMultiplier); 241 } 242 Thread.sleep(this.sleepForRetries * sleepMultiplier); 243 } catch (InterruptedException e) { 244 Thread.currentThread().interrupt(); 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId()); 247 } 248 } 249 return sleepMultiplier < maxRetriesMultiplier; 250 } 251 252 private int getEstimatedEntrySize(Entry e) { 253 long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf(); 254 return (int) size; 255 } 256 257 private List<List<Entry>> createParallelBatches(final List<Entry> entries) { 258 int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); 259 int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); 260 List<List<Entry>> entryLists = 261 Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList()); 262 int[] sizes = new int[n]; 263 for (Entry e : entries) { 264 int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n); 265 int entrySize = getEstimatedEntrySize(e); 266 // If this batch has at least one entry and is over sized, move it to the tail of list and 267 // initialize the entryLists[index] to be a empty list. 268 if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) { 269 entryLists.add(entryLists.get(index)); 270 entryLists.set(index, new ArrayList<>()); 271 sizes[index] = 0; 272 } 273 entryLists.get(index).add(e); 274 sizes[index] += entrySize; 275 } 276 return entryLists; 277 } 278 279 private List<List<Entry>> createSerialBatches(final List<Entry> entries) { 280 Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR); 281 for (Entry e : entries) { 282 regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>()) 283 .add(e); 284 } 285 return new ArrayList<>(regionEntries.values()); 286 } 287 288 /** 289 * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool 290 * concurrently. Note that, for serial replication, we need to make sure that entries from the 291 * same region to be replicated serially, so entries from the same region consist of a batch, and 292 * we will divide a batch into several batches by replicationRpcLimit in method 293 * serialReplicateRegionEntries() 294 */ 295 private List<List<Entry>> createBatches(final List<Entry> entries) { 296 if (isSerial) { 297 return createSerialBatches(entries); 298 } else { 299 return createParallelBatches(entries); 300 } 301 } 302 303 /** 304 * Check if there's an {@link TableNotFoundException} in the caused by stacktrace. 305 */ 306 public static boolean isTableNotFoundException(Throwable io) { 307 if (io instanceof RemoteException) { 308 io = ((RemoteException) io).unwrapRemoteException(); 309 } 310 if (io != null && io.getMessage().contains("TableNotFoundException")) { 311 return true; 312 } 313 for (; io != null; io = io.getCause()) { 314 if (io instanceof TableNotFoundException) { 315 return true; 316 } 317 } 318 return false; 319 } 320 321 /** 322 * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace. 323 */ 324 public static boolean isNoSuchColumnFamilyException(Throwable io) { 325 if (io instanceof RemoteException) { 326 io = ((RemoteException) io).unwrapRemoteException(); 327 } 328 if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) { 329 return true; 330 } 331 for (; io != null; io = io.getCause()) { 332 if (io instanceof NoSuchColumnFamilyException) { 333 return true; 334 } 335 } 336 return false; 337 } 338 339 List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) { 340 List<List<Entry>> entryList = new ArrayList<>(); 341 Map<TableName, Boolean> existMap = new HashMap<>(); 342 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 343 Admin localAdmin = localConn.getAdmin()) { 344 for (List<Entry> oldEntries : oldEntryList) { 345 List<Entry> entries = new ArrayList<>(); 346 for (Entry e : oldEntries) { 347 TableName tableName = e.getKey().getTableName(); 348 boolean exist = true; 349 if (existMap.containsKey(tableName)) { 350 exist = existMap.get(tableName); 351 } else { 352 try { 353 exist = localAdmin.tableExists(tableName); 354 existMap.put(tableName, exist); 355 } catch (IOException iox) { 356 LOG.warn("Exception checking for local table " + tableName, iox); 357 // we can't drop edits without full assurance, so we assume table exists. 358 exist = true; 359 } 360 } 361 if (exist) { 362 entries.add(e); 363 } else { 364 // Would potentially be better to retry in one of the outer loops 365 // and add a table filter there; but that would break the encapsulation, 366 // so we're doing the filtering here. 367 LOG.warn("Missing table detected at sink, local table also does not exist, " 368 + "filtering edits for table '{}'", tableName); 369 } 370 } 371 if (!entries.isEmpty()) { 372 entryList.add(entries); 373 } 374 } 375 } catch (IOException iox) { 376 LOG.warn("Exception when creating connection to check local table", iox); 377 return oldEntryList; 378 } 379 return entryList; 380 } 381 382 List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) { 383 List<List<Entry>> entryList = new ArrayList<>(); 384 Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>(); 385 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 386 Admin localAdmin = localConn.getAdmin()) { 387 for (List<Entry> oldEntries : oldEntryList) { 388 List<Entry> entries = new ArrayList<>(); 389 for (Entry e : oldEntries) { 390 TableName tableName = e.getKey().getTableName(); 391 if (!existColumnFamilyMap.containsKey(tableName)) { 392 try { 393 Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream() 394 .map(Bytes::toString).collect(Collectors.toSet()); 395 existColumnFamilyMap.put(tableName, cfs); 396 } catch (Exception ex) { 397 LOG.warn("Exception getting cf names for local table {}", tableName, ex); 398 // if catch any exception, we are not sure about table's description, 399 // so replicate raw entry 400 entries.add(e); 401 continue; 402 } 403 } 404 405 Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName); 406 Set<String> missingCFs = new HashSet<>(); 407 WALEdit walEdit = new WALEdit(); 408 walEdit.getCells().addAll(e.getEdit().getCells()); 409 WALUtil.filterCells(walEdit, cell -> { 410 String cf = Bytes.toString(CellUtil.cloneFamily(cell)); 411 if (existColumnFamilies.contains(cf)) { 412 return cell; 413 } else { 414 missingCFs.add(cf); 415 return null; 416 } 417 }); 418 if (!walEdit.isEmpty()) { 419 Entry newEntry = new Entry(e.getKey(), walEdit); 420 entries.add(newEntry); 421 } 422 423 if (!missingCFs.isEmpty()) { 424 // Would potentially be better to retry in one of the outer loops 425 // and add a table filter there; but that would break the encapsulation, 426 // so we're doing the filtering here. 427 LOG.warn( 428 "Missing column family detected at sink, local column family also does not exist," 429 + " filtering edits for table '{}',column family '{}'", 430 tableName, missingCFs); 431 } 432 } 433 if (!entries.isEmpty()) { 434 entryList.add(entries); 435 } 436 } 437 } catch (IOException iox) { 438 LOG.warn("Exception when creating connection to check local table", iox); 439 return oldEntryList; 440 } 441 return entryList; 442 } 443 444 private void reconnectToPeerCluster() { 445 ClusterConnection connection = null; 446 try { 447 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 448 } catch (IOException ioe) { 449 LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); 450 } 451 if (connection != null) { 452 this.conn = connection; 453 } 454 } 455 456 private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, 457 List<List<Entry>> batches) throws IOException { 458 int futures = 0; 459 for (int i = 0; i < batches.size(); i++) { 460 List<Entry> entries = batches.get(i); 461 if (!entries.isEmpty()) { 462 if (LOG.isTraceEnabled()) { 463 LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), 464 replicateContext.getSize()); 465 } 466 // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource 467 pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); 468 futures++; 469 } 470 } 471 472 IOException iox = null; 473 long lastWriteTime = 0; 474 for (int i = 0; i < futures; i++) { 475 try { 476 // wait for all futures, remove successful parts 477 // (only the remaining parts will be retried) 478 Future<Integer> f = pool.take(); 479 int index = f.get(); 480 List<Entry> batch = batches.get(index); 481 batches.set(index, Collections.emptyList()); // remove successful batch 482 // Find the most recent write time in the batch 483 long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); 484 if (writeTime > lastWriteTime) { 485 lastWriteTime = writeTime; 486 } 487 } catch (InterruptedException ie) { 488 iox = new IOException(ie); 489 } catch (ExecutionException ee) { 490 iox = ee.getCause() instanceof IOException 491 ? (IOException) ee.getCause() 492 : new IOException(ee.getCause()); 493 } 494 } 495 if (iox != null) { 496 // if we had any exceptions, try again 497 throw iox; 498 } 499 return lastWriteTime; 500 } 501 502 /** 503 * Do the shipping logic 504 */ 505 @Override 506 public boolean replicate(ReplicateContext replicateContext) { 507 CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); 508 int sleepMultiplier = 1; 509 int initialTimeout = replicateContext.getTimeout(); 510 511 if (!peersSelected && this.isRunning()) { 512 connectToPeers(); 513 peersSelected = true; 514 } 515 516 int numSinks = replicationSinkMgr.getNumSinks(); 517 if (numSinks == 0) { 518 if ( 519 (EnvironmentEdgeManager.currentTime() - lastSinkFetchTime) >= (maxRetriesMultiplier * 1000) 520 ) { 521 LOG.warn("No replication sinks found, returning without replicating. " 522 + "The source should retry with the same set of edits. Not logging this again for " 523 + "the next {} seconds.", maxRetriesMultiplier); 524 lastSinkFetchTime = EnvironmentEdgeManager.currentTime(); 525 } 526 sleepForRetries("No sinks available at peer", sleepMultiplier); 527 return false; 528 } 529 530 List<List<Entry>> batches = createBatches(replicateContext.getEntries()); 531 while (this.isRunning() && !exec.isShutdown()) { 532 if (!isPeerEnabled()) { 533 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 534 sleepMultiplier++; 535 } 536 continue; 537 } 538 if (this.conn == null || this.conn.isClosed()) { 539 reconnectToPeerCluster(); 540 } 541 try { 542 // replicate the batches to sink side. 543 parallelReplicate(pool, replicateContext, batches); 544 return true; 545 } catch (IOException ioe) { 546 if (ioe instanceof RemoteException) { 547 if (dropOnDeletedTables && isTableNotFoundException(ioe)) { 548 // Only filter the edits to replicate and don't change the entries in replicateContext 549 // as the upper layer rely on it. 550 batches = filterNotExistTableEdits(batches); 551 if (batches.isEmpty()) { 552 LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); 553 return true; 554 } 555 } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { 556 batches = filterNotExistColumnFamilyEdits(batches); 557 if (batches.isEmpty()) { 558 LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " 559 + "just return"); 560 return true; 561 } 562 } else { 563 LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), 564 ioe); 565 replicationSinkMgr.chooseSinks(); 566 } 567 } else { 568 if (ioe instanceof SocketTimeoutException) { 569 // This exception means we waited for more than 60s and nothing 570 // happened, the cluster is alive and calling it right away 571 // even for a test just makes things worse. 572 sleepForRetries( 573 "Encountered a SocketTimeoutException. Since the " 574 + "call to the remote cluster timed out, which is usually " 575 + "caused by a machine failure or a massive slowdown", 576 this.socketTimeoutMultiplier); 577 } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { 578 LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); 579 replicationSinkMgr.chooseSinks(); 580 } else if (ioe instanceof CallTimeoutException) { 581 replicateContext 582 .setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier)); 583 } else { 584 LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); 585 } 586 } 587 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { 588 sleepMultiplier++; 589 } 590 } 591 } 592 return false; // in case we exited before replicating 593 } 594 595 protected boolean isPeerEnabled() { 596 return ctx.getReplicationPeer().isPeerEnabled(); 597 } 598 599 @Override 600 protected void doStop() { 601 disconnect(); // don't call super.doStop() 602 if (this.conn != null) { 603 try { 604 this.conn.close(); 605 this.conn = null; 606 } catch (IOException e) { 607 LOG.warn("{} Failed to close the connection", logPeerId()); 608 } 609 } 610 // Allow currently running replication tasks to finish 611 exec.shutdown(); 612 try { 613 exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); 614 } catch (InterruptedException e) { 615 } 616 // Abort if the tasks did not terminate in time 617 if (!exec.isTerminated()) { 618 String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " 619 + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " 620 + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; 621 abortable.abort(errMsg, new IOException(errMsg)); 622 } 623 notifyStopped(); 624 } 625 626 protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) 627 throws IOException { 628 SinkPeer sinkPeer = null; 629 try { 630 int entriesHashCode = System.identityHashCode(entries); 631 if (LOG.isTraceEnabled()) { 632 long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); 633 LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", 634 logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); 635 } 636 sinkPeer = replicationSinkMgr.getReplicationSink(); 637 BlockingInterface rrs = sinkPeer.getRegionServer(); 638 try { 639 ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), 640 replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout); 641 if (LOG.isTraceEnabled()) { 642 LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); 643 } 644 } catch (IOException e) { 645 if (LOG.isTraceEnabled()) { 646 LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); 647 } 648 throw e; 649 } 650 replicationSinkMgr.reportSinkSuccess(sinkPeer); 651 } catch (IOException ioe) { 652 if (sinkPeer != null) { 653 replicationSinkMgr.reportBadSink(sinkPeer); 654 } 655 throw ioe; 656 } 657 return batchIndex; 658 } 659 660 private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) 661 throws IOException { 662 int batchSize = 0, index = 0; 663 List<Entry> batch = new ArrayList<>(); 664 for (Entry entry : entries) { 665 int entrySize = getEstimatedEntrySize(entry); 666 if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { 667 replicateEntries(batch, index++, timeout); 668 batch.clear(); 669 batchSize = 0; 670 } 671 batch.add(entry); 672 batchSize += entrySize; 673 } 674 if (batchSize > 0) { 675 replicateEntries(batch, index, timeout); 676 } 677 return batchIndex; 678 } 679 680 protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { 681 return isSerial 682 ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) 683 : () -> replicateEntries(entries, batchIndex, timeout); 684 } 685 686 private String logPeerId() { 687 return "[Source for peer " + this.ctx.getPeerId() + "]:"; 688 } 689 690}