001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.net.ConnectException; 023import java.net.SocketTimeoutException; 024import java.net.UnknownHostException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeMap; 033import java.util.concurrent.Callable; 034import java.util.concurrent.CompletionService; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorCompletionService; 037import java.util.concurrent.Future; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.stream.Collectors; 041import java.util.stream.Stream; 042import org.apache.commons.lang3.StringUtils; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.Abortable; 046import org.apache.hadoop.hbase.CellUtil; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotFoundException; 051import org.apache.hadoop.hbase.client.Admin; 052import org.apache.hadoop.hbase.client.ClusterConnection; 053import org.apache.hadoop.hbase.client.Connection; 054import org.apache.hadoop.hbase.client.ConnectionFactory; 055import org.apache.hadoop.hbase.ipc.RpcServer; 056import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; 057import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 058import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 060import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.WAL.Entry; 065import org.apache.hadoop.hbase.wal.WALEdit; 066import org.apache.hadoop.ipc.RemoteException; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 072import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 073 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 075 076/** 077 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 078 * implementation for replicating to another HBase cluster. 079 * For the slave cluster it selects a random number of peers 080 * using a replication ratio. For example, if replication ration = 0.1 081 * and slave cluster has 100 region servers, 10 will be selected. 082 * <p> 083 * A stream is considered down when we cannot contact a region server on the 084 * peer cluster for more than 55 seconds by default. 085 * </p> 086 */ 087@InterfaceAudience.Private 088public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { 089 private static final Logger LOG = 090 LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); 091 092 private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; 093 094 /** Drop edits for tables that been deleted from the replication source and target */ 095 public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = 096 "hbase.replication.drop.on.deleted.table"; 097 /** Drop edits for CFs that been deleted from the replication source and target */ 098 public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY = 099 "hbase.replication.drop.on.deleted.columnfamily"; 100 101 private ClusterConnection conn; 102 private Configuration localConf; 103 private Configuration conf; 104 // How long should we sleep for each retry 105 private long sleepForRetries; 106 // Maximum number of retries before taking bold actions 107 private int maxRetriesMultiplier; 108 // Socket timeouts require even bolder actions since we don't want to DDOS 109 private int socketTimeoutMultiplier; 110 // Amount of time for shutdown to wait for all tasks to complete 111 private long maxTerminationWait; 112 // Size limit for replication RPCs, in bytes 113 private int replicationRpcLimit; 114 //Metrics for this source 115 private MetricsSource metrics; 116 // Handles connecting to peer region servers 117 private ReplicationSinkManager replicationSinkMgr; 118 private boolean peersSelected = false; 119 private String replicationClusterId = ""; 120 private ThreadPoolExecutor exec; 121 private int maxThreads; 122 private Path baseNamespaceDir; 123 private Path hfileArchiveDir; 124 private boolean replicationBulkLoadDataEnabled; 125 private Abortable abortable; 126 private boolean dropOnDeletedTables; 127 private boolean dropOnDeletedColumnFamilies; 128 private boolean isSerial = false; 129 //Initialising as 0 to guarantee at least one logging message 130 private long lastSinkFetchTime = 0; 131 132 /* 133 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 134 * different Connection implementations, or initialize it in a different way, 135 * so defining createConnection as protected for possible overridings. 136 */ 137 protected Connection createConnection(Configuration conf) throws IOException { 138 return ConnectionFactory.createConnection(conf); 139 } 140 141 /* 142 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiating 143 * different ReplicationSinkManager implementations, or initialize it in a different way, 144 * so defining createReplicationSinkManager as protected for possible overridings. 145 */ 146 protected ReplicationSinkManager createReplicationSinkManager(Connection conn) { 147 return new ReplicationSinkManager((ClusterConnection) conn, this.ctx.getPeerId(), 148 this, this.conf); 149 } 150 151 @Override 152 public void init(Context context) throws IOException { 153 super.init(context); 154 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 155 this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); 156 decorateConf(); 157 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 158 this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", 159 maxRetriesMultiplier); 160 // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator 161 // tasks to terminate when doStop() is called. 162 long maxTerminationWaitMultiplier = this.conf.getLong( 163 "replication.source.maxterminationmultiplier", 164 DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); 165 this.maxTerminationWait = maxTerminationWaitMultiplier * 166 this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 167 // TODO: This connection is replication specific or we should make it particular to 168 // replication and make replication specific settings such as compression or codec to use 169 // passing Cells. 170 Connection connection = createConnection(this.conf); 171 //Since createConnection method may be overridden by extending classes, we need to make sure 172 //it's indeed returning a ClusterConnection instance. 173 Preconditions.checkState(connection instanceof ClusterConnection); 174 this.conn = (ClusterConnection) connection; 175 this.sleepForRetries = 176 this.conf.getLong("replication.source.sleepforretries", 1000); 177 this.metrics = context.getMetrics(); 178 // ReplicationQueueInfo parses the peerId out of the znode for us 179 this.replicationSinkMgr = createReplicationSinkManager(conn); 180 // per sink thread pool 181 this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 182 HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); 183 this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, 184 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); 185 this.abortable = ctx.getAbortable(); 186 // Set the size limit for replication RPCs to 95% of the max request size. 187 // We could do with less slop if we have an accurate estimate of encoded size. Being 188 // conservative for now. 189 this.replicationRpcLimit = (int)(0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE, 190 RpcServer.DEFAULT_MAX_REQUEST_SIZE)); 191 this.dropOnDeletedTables = 192 this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); 193 this.dropOnDeletedColumnFamilies = this.conf 194 .getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false); 195 196 this.replicationBulkLoadDataEnabled = 197 conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 198 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 199 if (this.replicationBulkLoadDataEnabled) { 200 replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID); 201 } 202 // Construct base namespace directory and hfile archive directory path 203 Path rootDir = CommonFSUtils.getRootDir(conf); 204 Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); 205 baseNamespaceDir = new Path(rootDir, baseNSDir); 206 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); 207 isSerial = context.getPeerConfig().isSerial(); 208 } 209 210 private void decorateConf() { 211 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 212 if (StringUtils.isNotEmpty(replicationCodec)) { 213 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 214 } 215 } 216 217 private void connectToPeers() { 218 getRegionServers(); 219 220 int sleepMultiplier = 1; 221 222 // Connect to peer cluster first, unless we have to stop 223 while (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 224 replicationSinkMgr.chooseSinks(); 225 if (this.isRunning() && replicationSinkMgr.getNumSinks() == 0) { 226 if (sleepForRetries("Waiting for peers", sleepMultiplier)) { 227 sleepMultiplier++; 228 } 229 } 230 } 231 } 232 233 /** 234 * Do the sleeping logic 235 * @param msg Why we sleep 236 * @param sleepMultiplier by how many times the default sleeping time is augmented 237 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 238 */ 239 private boolean sleepForRetries(String msg, int sleepMultiplier) { 240 try { 241 if (LOG.isTraceEnabled()) { 242 LOG.trace("{} {}, sleeping {} times {}", 243 logPeerId(), msg, sleepForRetries, sleepMultiplier); 244 } 245 Thread.sleep(this.sleepForRetries * sleepMultiplier); 246 } catch (InterruptedException e) { 247 Thread.currentThread().interrupt(); 248 if (LOG.isDebugEnabled()) { 249 LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId()); 250 } 251 } 252 return sleepMultiplier < maxRetriesMultiplier; 253 } 254 255 private int getEstimatedEntrySize(Entry e) { 256 long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf(); 257 return (int) size; 258 } 259 260 private List<List<Entry>> createParallelBatches(final List<Entry> entries) { 261 int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); 262 int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks); 263 List<List<Entry>> entryLists = 264 Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList()); 265 int[] sizes = new int[n]; 266 for (Entry e : entries) { 267 int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n); 268 int entrySize = getEstimatedEntrySize(e); 269 // If this batch has at least one entry and is over sized, move it to the tail of list and 270 // initialize the entryLists[index] to be a empty list. 271 if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) { 272 entryLists.add(entryLists.get(index)); 273 entryLists.set(index, new ArrayList<>()); 274 sizes[index] = 0; 275 } 276 entryLists.get(index).add(e); 277 sizes[index] += entrySize; 278 } 279 return entryLists; 280 } 281 282 private List<List<Entry>> createSerialBatches(final List<Entry> entries) { 283 Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR); 284 for (Entry e : entries) { 285 regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>()) 286 .add(e); 287 } 288 return new ArrayList<>(regionEntries.values()); 289 } 290 291 /** 292 * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool 293 * concurrently. Note that, for serial replication, we need to make sure that entries from the 294 * same region to be replicated serially, so entries from the same region consist of a batch, and 295 * we will divide a batch into several batches by replicationRpcLimit in method 296 * serialReplicateRegionEntries() 297 */ 298 private List<List<Entry>> createBatches(final List<Entry> entries) { 299 if (isSerial) { 300 return createSerialBatches(entries); 301 } else { 302 return createParallelBatches(entries); 303 } 304 } 305 306 /** 307 * Check if there's an {@link TableNotFoundException} in the caused by stacktrace. 308 */ 309 public static boolean isTableNotFoundException(Throwable io) { 310 if (io instanceof RemoteException) { 311 io = ((RemoteException) io).unwrapRemoteException(); 312 } 313 if (io != null && io.getMessage().contains("TableNotFoundException")) { 314 return true; 315 } 316 for (; io != null; io = io.getCause()) { 317 if (io instanceof TableNotFoundException) { 318 return true; 319 } 320 } 321 return false; 322 } 323 324 /** 325 * Check if there's an {@link NoSuchColumnFamilyException} in the caused by stacktrace. 326 */ 327 public static boolean isNoSuchColumnFamilyException(Throwable io) { 328 if (io instanceof RemoteException) { 329 io = ((RemoteException) io).unwrapRemoteException(); 330 } 331 if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) { 332 return true; 333 } 334 for (; io != null; io = io.getCause()) { 335 if (io instanceof NoSuchColumnFamilyException) { 336 return true; 337 } 338 } 339 return false; 340 } 341 342 List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>> oldEntryList) { 343 List<List<Entry>> entryList = new ArrayList<>(); 344 Map<TableName, Boolean> existMap = new HashMap<>(); 345 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 346 Admin localAdmin = localConn.getAdmin()) { 347 for (List<Entry> oldEntries : oldEntryList) { 348 List<Entry> entries = new ArrayList<>(); 349 for (Entry e : oldEntries) { 350 TableName tableName = e.getKey().getTableName(); 351 boolean exist = true; 352 if (existMap.containsKey(tableName)) { 353 exist = existMap.get(tableName); 354 } else { 355 try { 356 exist = localAdmin.tableExists(tableName); 357 existMap.put(tableName, exist); 358 } catch (IOException iox) { 359 LOG.warn("Exception checking for local table " + tableName, iox); 360 // we can't drop edits without full assurance, so we assume table exists. 361 exist = true; 362 } 363 } 364 if (exist) { 365 entries.add(e); 366 } else { 367 // Would potentially be better to retry in one of the outer loops 368 // and add a table filter there; but that would break the encapsulation, 369 // so we're doing the filtering here. 370 LOG.warn("Missing table detected at sink, local table also does not exist, " 371 + "filtering edits for table '{}'", tableName); 372 } 373 } 374 if (!entries.isEmpty()) { 375 entryList.add(entries); 376 } 377 } 378 } catch (IOException iox) { 379 LOG.warn("Exception when creating connection to check local table", iox); 380 return oldEntryList; 381 } 382 return entryList; 383 } 384 385 List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>> oldEntryList) { 386 List<List<Entry>> entryList = new ArrayList<>(); 387 Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>(); 388 try (Connection localConn = ConnectionFactory.createConnection(ctx.getLocalConfiguration()); 389 Admin localAdmin = localConn.getAdmin()) { 390 for (List<Entry> oldEntries : oldEntryList) { 391 List<Entry> entries = new ArrayList<>(); 392 for (Entry e : oldEntries) { 393 TableName tableName = e.getKey().getTableName(); 394 if (!existColumnFamilyMap.containsKey(tableName)) { 395 try { 396 Set<String> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream() 397 .map(Bytes::toString).collect(Collectors.toSet()); 398 existColumnFamilyMap.put(tableName, cfs); 399 } catch (Exception ex) { 400 LOG.warn("Exception getting cf names for local table {}", tableName, ex); 401 // if catch any exception, we are not sure about table's description, 402 // so replicate raw entry 403 entries.add(e); 404 continue; 405 } 406 } 407 408 Set<String> existColumnFamilies = existColumnFamilyMap.get(tableName); 409 Set<String> missingCFs = new HashSet<>(); 410 WALEdit walEdit = new WALEdit(); 411 walEdit.getCells().addAll(e.getEdit().getCells()); 412 WALUtil.filterCells(walEdit, cell -> { 413 String cf = Bytes.toString(CellUtil.cloneFamily(cell)); 414 if (existColumnFamilies.contains(cf)) { 415 return cell; 416 } else { 417 missingCFs.add(cf); 418 return null; 419 } 420 }); 421 if (!walEdit.isEmpty()) { 422 Entry newEntry = new Entry(e.getKey(), walEdit); 423 entries.add(newEntry); 424 } 425 426 if (!missingCFs.isEmpty()) { 427 // Would potentially be better to retry in one of the outer loops 428 // and add a table filter there; but that would break the encapsulation, 429 // so we're doing the filtering here. 430 LOG.warn( 431 "Missing column family detected at sink, local column family also does not exist," 432 + " filtering edits for table '{}',column family '{}'", tableName, missingCFs); 433 } 434 } 435 if (!entries.isEmpty()) { 436 entryList.add(entries); 437 } 438 } 439 } catch (IOException iox) { 440 LOG.warn("Exception when creating connection to check local table", iox); 441 return oldEntryList; 442 } 443 return entryList; 444 } 445 446 private void reconnectToPeerCluster() { 447 ClusterConnection connection = null; 448 try { 449 connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); 450 } catch (IOException ioe) { 451 LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); 452 } 453 if (connection != null) { 454 this.conn = connection; 455 } 456 } 457 458 private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, 459 List<List<Entry>> batches) throws IOException { 460 int futures = 0; 461 for (int i = 0; i < batches.size(); i++) { 462 List<Entry> entries = batches.get(i); 463 if (!entries.isEmpty()) { 464 if (LOG.isTraceEnabled()) { 465 LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), 466 replicateContext.getSize()); 467 } 468 // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource 469 pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); 470 futures++; 471 } 472 } 473 474 IOException iox = null; 475 long lastWriteTime = 0; 476 for (int i = 0; i < futures; i++) { 477 try { 478 // wait for all futures, remove successful parts 479 // (only the remaining parts will be retried) 480 Future<Integer> f = pool.take(); 481 int index = f.get(); 482 List<Entry> batch = batches.get(index); 483 batches.set(index, Collections.emptyList()); // remove successful batch 484 // Find the most recent write time in the batch 485 long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime(); 486 if (writeTime > lastWriteTime) { 487 lastWriteTime = writeTime; 488 } 489 } catch (InterruptedException ie) { 490 iox = new IOException(ie); 491 } catch (ExecutionException ee) { 492 iox = ee.getCause() instanceof IOException? 493 (IOException)ee.getCause(): new IOException(ee.getCause()); 494 } 495 } 496 if (iox != null) { 497 // if we had any exceptions, try again 498 throw iox; 499 } 500 return lastWriteTime; 501 } 502 503 /** 504 * Do the shipping logic 505 */ 506 @Override 507 public boolean replicate(ReplicateContext replicateContext) { 508 CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); 509 int sleepMultiplier = 1; 510 511 if (!peersSelected && this.isRunning()) { 512 connectToPeers(); 513 peersSelected = true; 514 } 515 516 int numSinks = replicationSinkMgr.getNumSinks(); 517 if (numSinks == 0) { 518 if((System.currentTimeMillis() - lastSinkFetchTime) >= (maxRetriesMultiplier*1000)) { 519 LOG.warn( 520 "No replication sinks found, returning without replicating. " 521 + "The source should retry with the same set of edits. Not logging this again for " 522 + "the next {} seconds.", maxRetriesMultiplier); 523 lastSinkFetchTime = System.currentTimeMillis(); 524 } 525 sleepForRetries("No sinks available at peer", sleepMultiplier); 526 return false; 527 } 528 529 List<List<Entry>> batches = createBatches(replicateContext.getEntries()); 530 while (this.isRunning() && !exec.isShutdown()) { 531 if (!isPeerEnabled()) { 532 if (sleepForRetries("Replication is disabled", sleepMultiplier)) { 533 sleepMultiplier++; 534 } 535 continue; 536 } 537 if (this.conn == null || this.conn.isClosed()) { 538 reconnectToPeerCluster(); 539 } 540 try { 541 // replicate the batches to sink side. 542 parallelReplicate(pool, replicateContext, batches); 543 return true; 544 } catch (IOException ioe) { 545 if (ioe instanceof RemoteException) { 546 if (dropOnDeletedTables && isTableNotFoundException(ioe)) { 547 // Only filter the edits to replicate and don't change the entries in replicateContext 548 // as the upper layer rely on it. 549 batches = filterNotExistTableEdits(batches); 550 if (batches.isEmpty()) { 551 LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return"); 552 return true; 553 } 554 } else if (dropOnDeletedColumnFamilies && isNoSuchColumnFamilyException(ioe)) { 555 batches = filterNotExistColumnFamilyEdits(batches); 556 if (batches.isEmpty()) { 557 LOG.warn("After filter not exist column family's edits, 0 edits to replicate, " 558 + "just return"); 559 return true; 560 } 561 } else { 562 LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(), 563 ioe); 564 replicationSinkMgr.chooseSinks(); 565 } 566 } else { 567 if (ioe instanceof SocketTimeoutException) { 568 // This exception means we waited for more than 60s and nothing 569 // happened, the cluster is alive and calling it right away 570 // even for a test just makes things worse. 571 sleepForRetries("Encountered a SocketTimeoutException. Since the " + 572 "call to the remote cluster timed out, which is usually " + 573 "caused by a machine failure or a massive slowdown", 574 this.socketTimeoutMultiplier); 575 } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { 576 LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); 577 replicationSinkMgr.chooseSinks(); 578 } else { 579 LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); 580 } 581 } 582 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { 583 sleepMultiplier++; 584 } 585 } 586 } 587 return false; // in case we exited before replicating 588 } 589 590 protected boolean isPeerEnabled() { 591 return ctx.getReplicationPeer().isPeerEnabled(); 592 } 593 594 @Override 595 protected void doStop() { 596 disconnect(); // don't call super.doStop() 597 if (this.conn != null) { 598 try { 599 this.conn.close(); 600 this.conn = null; 601 } catch (IOException e) { 602 LOG.warn("{} Failed to close the connection", logPeerId()); 603 } 604 } 605 // Allow currently running replication tasks to finish 606 exec.shutdown(); 607 try { 608 exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); 609 } catch (InterruptedException e) { 610 } 611 // Abort if the tasks did not terminate in time 612 if (!exec.isTerminated()) { 613 String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + 614 "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + 615 "Aborting to prevent Replication from deadlocking. See HBASE-16081."; 616 abortable.abort(errMsg, new IOException(errMsg)); 617 } 618 notifyStopped(); 619 } 620 621 protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) 622 throws IOException { 623 SinkPeer sinkPeer = null; 624 try { 625 int entriesHashCode = System.identityHashCode(entries); 626 if (LOG.isTraceEnabled()) { 627 long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); 628 LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", 629 logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); 630 } 631 sinkPeer = replicationSinkMgr.getReplicationSink(); 632 BlockingInterface rrs = sinkPeer.getRegionServer(); 633 try { 634 ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), 635 replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout); 636 if (LOG.isTraceEnabled()) { 637 LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); 638 } 639 } catch (IOException e) { 640 if (LOG.isTraceEnabled()) { 641 LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); 642 } 643 throw e; 644 } 645 replicationSinkMgr.reportSinkSuccess(sinkPeer); 646 } catch (IOException ioe) { 647 if (sinkPeer != null) { 648 replicationSinkMgr.reportBadSink(sinkPeer); 649 } 650 throw ioe; 651 } 652 return batchIndex; 653 } 654 655 private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) 656 throws IOException { 657 int batchSize = 0, index = 0; 658 List<Entry> batch = new ArrayList<>(); 659 for (Entry entry : entries) { 660 int entrySize = getEstimatedEntrySize(entry); 661 if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { 662 replicateEntries(batch, index++, timeout); 663 batch.clear(); 664 batchSize = 0; 665 } 666 batch.add(entry); 667 batchSize += entrySize; 668 } 669 if (batchSize > 0) { 670 replicateEntries(batch, index, timeout); 671 } 672 return batchIndex; 673 } 674 675 protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { 676 return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) 677 : () -> replicateEntries(entries, batchIndex, timeout); 678 } 679 680 private String logPeerId(){ 681 return "[Source for peer " + this.ctx.getPeerId() + "]:"; 682 } 683 684}