001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.lang.reflect.InvocationTargetException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.PriorityBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.function.Predicate; 040 041import org.apache.commons.lang3.StringUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.Server; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableDescriptors; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.RSRpcServices; 053import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 054import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 055import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 056import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 057import org.apache.hadoop.hbase.replication.ReplicationException; 058import org.apache.hadoop.hbase.replication.ReplicationPeer; 059import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 062import org.apache.hadoop.hbase.replication.WALEntryFilter; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.Pair; 065import org.apache.hadoop.hbase.util.Threads; 066import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 067import org.apache.hadoop.hbase.wal.WAL.Entry; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073 074/** 075 * Class that handles the source of a replication stream. 076 * Currently does not handle more than 1 slave cluster. 077 * For each slave cluster it selects a random number of peers 078 * using a replication ratio. For example, if replication ration = 0.1 079 * and slave cluster has 100 region servers, 10 will be selected. 080 * <p> 081 * A stream is considered down when we cannot contact a region server on the 082 * peer cluster for more than 55 seconds by default. 083 * </p> 084 */ 085@InterfaceAudience.Private 086public class ReplicationSource implements ReplicationSourceInterface { 087 088 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 089 // Queues of logs to process, entry in format of walGroupId->queue, 090 // each presents a queue for one wal group 091 private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>(); 092 // per group queue size, keep no more than this number of logs in each wal group 093 protected int queueSizePerGroup; 094 protected ReplicationQueueStorage queueStorage; 095 protected ReplicationPeer replicationPeer; 096 097 protected Configuration conf; 098 protected ReplicationQueueInfo replicationQueueInfo; 099 // id of the peer cluster this source replicates to 100 private String peerId; 101 102 // The manager of all sources to which we ping back our progress 103 protected ReplicationSourceManager manager; 104 // Should we stop everything? 105 protected Server server; 106 // How long should we sleep for each retry 107 private long sleepForRetries; 108 protected FileSystem fs; 109 // id of this cluster 110 private UUID clusterId; 111 // total number of edits we replicated 112 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 113 // The znode we currently play with 114 protected String queueId; 115 // Maximum number of retries before taking bold actions 116 private int maxRetriesMultiplier; 117 // Indicates if this particular source is running 118 private volatile boolean sourceRunning = false; 119 // Metrics for this source 120 private MetricsSource metrics; 121 // WARN threshold for the number of queued logs, defaults to 2 122 private int logQueueWarnThreshold; 123 // ReplicationEndpoint which will handle the actual replication 124 private volatile ReplicationEndpoint replicationEndpoint; 125 126 private boolean abortOnError; 127 //This is needed for the startup loop to identify when there's already 128 //an initialization happening (but not finished yet), 129 //so that it doesn't try submit another initialize thread. 130 //NOTE: this should only be set to false at the end of initialize method, prior to return. 131 private AtomicBoolean startupOngoing = new AtomicBoolean(false); 132 //Flag that signalizes uncaught error happening while starting up the source 133 //and a retry should be attempted 134 private AtomicBoolean retryStartup = new AtomicBoolean(false); 135 136 /** 137 * A filter (or a chain of filters) for WAL entries; filters out edits. 138 */ 139 protected volatile WALEntryFilter walEntryFilter; 140 141 // throttler 142 private ReplicationThrottler throttler; 143 private long defaultBandwidth; 144 private long currentBandwidth; 145 private WALFileLengthProvider walFileLengthProvider; 146 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 147 new ConcurrentHashMap<>(); 148 149 private AtomicLong totalBufferUsed; 150 151 public static final String WAIT_ON_ENDPOINT_SECONDS = 152 "hbase.replication.wait.on.endpoint.seconds"; 153 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 154 private int waitOnEndpointSeconds = -1; 155 156 private Thread initThread; 157 158 /** 159 * WALs to replicate. 160 * Predicate that returns 'true' for WALs to replicate and false for WALs to skip. 161 */ 162 private final Predicate<Path> filterInWALs; 163 164 /** 165 * Base WALEntry filters for this class. Unmodifiable. Set on construction. 166 * Filters *out* edits we do not want replicated, passed on to replication endpoints. 167 * This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of 168 * the WALEntry filter chain. These are put after those that we pick up from the configured 169 * endpoints and other machinations to create the final {@link #walEntryFilter}. 170 * @see WALEntryFilter 171 */ 172 private final List<WALEntryFilter> baseFilterOutWALEntries; 173 174 ReplicationSource() { 175 // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. 176 this(p -> !AbstractFSWALProvider.isMetaFile(p), 177 Lists.newArrayList(new SystemTableWALEntryFilter())); 178 } 179 180 /** 181 * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to Replicate; 182 * i.e. return 'true' if you want to replicate the content of the WAL. 183 * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* 184 * WALEntries so they never make it out of this ReplicationSource. 185 */ 186 ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) { 187 this.filterInWALs = replicateWAL; 188 this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); 189 } 190 191 /** 192 * Instantiation method used by region servers 193 * @param conf configuration to use 194 * @param fs file system to use 195 * @param manager replication manager to ping to 196 * @param server the server for this region server 197 * @param queueId the id of our replication queue 198 * @param clusterId unique UUID for the cluster 199 * @param metrics metrics for replication source 200 */ 201 @Override 202 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 203 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 204 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 205 MetricsSource metrics) throws IOException { 206 this.server = server; 207 this.conf = HBaseConfiguration.create(conf); 208 this.waitOnEndpointSeconds = 209 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 210 decorateConf(); 211 this.sleepForRetries = 212 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 213 this.maxRetriesMultiplier = 214 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 215 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 216 this.queueStorage = queueStorage; 217 this.replicationPeer = replicationPeer; 218 this.manager = manager; 219 this.fs = fs; 220 this.metrics = metrics; 221 this.clusterId = clusterId; 222 223 this.queueId = queueId; 224 this.replicationQueueInfo = new ReplicationQueueInfo(queueId); 225 // ReplicationQueueInfo parses the peerId out of the znode for us 226 this.peerId = this.replicationQueueInfo.getPeerId(); 227 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); 228 229 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. 230 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 231 currentBandwidth = getCurrentBandwidth(); 232 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 233 this.totalBufferUsed = manager.getTotalBufferUsed(); 234 this.walFileLengthProvider = walFileLengthProvider; 235 236 this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", 237 true); 238 239 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 240 replicationPeer.getId(), this.currentBandwidth); 241 } 242 243 private void decorateConf() { 244 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 245 if (StringUtils.isNotEmpty(replicationCodec)) { 246 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 247 } 248 } 249 250 @Override 251 public void enqueueLog(Path wal) { 252 if (!this.filterInWALs.test(wal)) { 253 LOG.trace("NOT replicating {}", wal); 254 return; 255 } 256 // Use WAL prefix as the WALGroupId for this peer. 257 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 258 PriorityBlockingQueue<Path> queue = queues.get(walPrefix); 259 if (queue == null) { 260 queue = new PriorityBlockingQueue<>(queueSizePerGroup, 261 new AbstractFSWALProvider.WALStartTimeComparator()); 262 // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise 263 // the shipper may quit immediately 264 queue.put(wal); 265 queues.put(walPrefix, queue); 266 if (this.isSourceActive() && this.walEntryFilter != null) { 267 // new wal group observed after source startup, start a new worker thread to track it 268 // notice: it's possible that wal enqueued when this.running is set but worker thread 269 // still not launched, so it's necessary to check workerThreads before start the worker 270 tryStartNewShipper(walPrefix, queue); 271 } 272 } else { 273 queue.put(wal); 274 } 275 if (LOG.isTraceEnabled()) { 276 LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, 277 this.replicationQueueInfo.getQueueId()); 278 } 279 this.metrics.incrSizeOfLogQueue(); 280 // This will wal a warning for each new wal that gets created above the warn threshold 281 int queueSize = queue.size(); 282 if (queueSize > this.logQueueWarnThreshold) { 283 LOG.warn("{} WAL group {} queue size: {} exceeds value of " 284 + "replication.source.log.queue.warn: {}", logPeerId(), 285 walPrefix, queueSize, logQueueWarnThreshold); 286 } 287 } 288 289 @Override 290 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 291 throws ReplicationException { 292 String peerId = replicationPeer.getId(); 293 Set<String> namespaces = replicationPeer.getNamespaces(); 294 Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); 295 if (tableCFMap != null) { // All peers with TableCFs 296 List<String> tableCfs = tableCFMap.get(tableName); 297 if (tableCFMap.containsKey(tableName) 298 && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { 299 this.queueStorage.addHFileRefs(peerId, pairs); 300 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 301 } else { 302 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 303 tableName, Bytes.toString(family), peerId); 304 } 305 } else if (namespaces != null) { // Only for set NAMESPACES peers 306 if (namespaces.contains(tableName.getNamespaceAsString())) { 307 this.queueStorage.addHFileRefs(peerId, pairs); 308 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 309 } else { 310 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 311 tableName, Bytes.toString(family), peerId); 312 } 313 } else { 314 // user has explicitly not defined any table cfs for replication, means replicate all the 315 // data 316 this.queueStorage.addHFileRefs(peerId, pairs); 317 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 318 } 319 } 320 321 private ReplicationEndpoint createReplicationEndpoint() 322 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 323 RegionServerCoprocessorHost rsServerHost = null; 324 if (server instanceof HRegionServer) { 325 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 326 } 327 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 328 329 ReplicationEndpoint replicationEndpoint; 330 if (replicationEndpointImpl == null) { 331 // Default to HBase inter-cluster replication endpoint; skip reflection 332 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 333 } else { 334 try { 335 replicationEndpoint = Class.forName(replicationEndpointImpl) 336 .asSubclass(ReplicationEndpoint.class) 337 .getDeclaredConstructor() 338 .newInstance(); 339 } catch (NoSuchMethodException | InvocationTargetException e) { 340 throw new IllegalArgumentException(e); 341 } 342 } 343 if (rsServerHost != null) { 344 ReplicationEndpoint newReplicationEndPoint = 345 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 346 if (newReplicationEndPoint != null) { 347 // Override the newly created endpoint from the hook with configured end point 348 replicationEndpoint = newReplicationEndPoint; 349 } 350 } 351 return replicationEndpoint; 352 } 353 354 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 355 throws IOException, TimeoutException { 356 TableDescriptors tableDescriptors = null; 357 if (server instanceof HRegionServer) { 358 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 359 } 360 replicationEndpoint 361 .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, 362 clusterId, replicationPeer, metrics, tableDescriptors, server)); 363 replicationEndpoint.start(); 364 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 365 } 366 367 private void initializeWALEntryFilter(UUID peerClusterId) { 368 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 369 List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries); 370 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 371 if (filterFromEndpoint != null) { 372 filters.add(filterFromEndpoint); 373 } 374 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 375 this.walEntryFilter = new ChainWALEntryFilter(filters); 376 } 377 378 private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { 379 workerThreads.compute(walGroupId, (key, value) -> { 380 if (value != null) { 381 LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); 382 return value; 383 } else { 384 LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); 385 ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); 386 ReplicationSourceWALReader walReader = 387 createNewWALReader(walGroupId, queue, worker.getStartPosition()); 388 Threads.setDaemonThreadRunning( 389 walReader, Thread.currentThread().getName() 390 + ".replicationSource.wal-reader." + walGroupId + "," + queueId, 391 (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); 392 worker.setWALReader(walReader); 393 worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); 394 return worker; 395 } 396 }); 397 } 398 399 @Override 400 public Map<String, ReplicationStatus> getWalGroupStatus() { 401 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 402 long ageOfLastShippedOp, replicationDelay, fileSize; 403 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 404 String walGroupId = walGroupShipper.getKey(); 405 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 406 ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); 407 int queueSize = queues.get(walGroupId).size(); 408 replicationDelay = metrics.getReplicationDelay(); 409 Path currentPath = shipper.getCurrentPath(); 410 fileSize = -1; 411 if (currentPath != null) { 412 try { 413 fileSize = getFileSize(currentPath); 414 } catch (IOException e) { 415 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 416 } 417 } else { 418 currentPath = new Path("NO_LOGS_IN_QUEUE"); 419 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 420 } 421 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 422 statusBuilder.withPeerId(this.getPeerId()) 423 .withQueueSize(queueSize) 424 .withWalGroup(walGroupId) 425 .withCurrentPath(currentPath) 426 .withCurrentPosition(shipper.getCurrentPosition()) 427 .withFileSize(fileSize) 428 .withAgeOfLastShippedOp(ageOfLastShippedOp) 429 .withReplicationDelay(replicationDelay); 430 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 431 } 432 return sourceReplicationStatus; 433 } 434 435 private long getFileSize(Path currentPath) throws IOException { 436 long fileSize; 437 try { 438 fileSize = fs.getContentSummary(currentPath).getLength(); 439 } catch (FileNotFoundException e) { 440 currentPath = getArchivedLogPath(currentPath, conf); 441 fileSize = fs.getContentSummary(currentPath).getLength(); 442 } 443 return fileSize; 444 } 445 446 protected ReplicationSourceShipper createNewShipper(String walGroupId, 447 PriorityBlockingQueue<Path> queue) { 448 return new ReplicationSourceShipper(conf, walGroupId, queue, this); 449 } 450 451 private ReplicationSourceWALReader createNewWALReader(String walGroupId, 452 PriorityBlockingQueue<Path> queue, long startPosition) { 453 return replicationPeer.getPeerConfig().isSerial() 454 ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) 455 : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); 456 } 457 458 /** 459 * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. 460 * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. 461 */ 462 WALEntryFilter getWalEntryFilter() { 463 return walEntryFilter; 464 } 465 466 protected final void uncaughtException(Thread t, Throwable e, 467 ReplicationSourceManager manager, String peerId) { 468 RSRpcServices.exitIfOOME(e); 469 LOG.error("Unexpected exception in {} currentPath={}", 470 t.getName(), getCurrentPath(), e); 471 if(abortOnError){ 472 server.abort("Unexpected exception in " + t.getName(), e); 473 } 474 if(manager != null){ 475 while (true) { 476 try { 477 LOG.info("Refreshing replication sources now due to previous error on thread: {}", 478 t.getName()); 479 manager.refreshSources(peerId); 480 break; 481 } catch (IOException e1) { 482 LOG.error("Replication sources refresh failed.", e1); 483 sleepForRetries("Sleeping before try refreshing sources again", 484 maxRetriesMultiplier); 485 } 486 } 487 } 488 } 489 490 @Override 491 public ReplicationEndpoint getReplicationEndpoint() { 492 return this.replicationEndpoint; 493 } 494 495 @Override 496 public ReplicationSourceManager getSourceManager() { 497 return this.manager; 498 } 499 500 @Override 501 public void tryThrottle(int batchSize) throws InterruptedException { 502 checkBandwidthChangeAndResetThrottler(); 503 if (throttler.isEnabled()) { 504 long sleepTicks = throttler.getNextSleepInterval(batchSize); 505 if (sleepTicks > 0) { 506 if (LOG.isTraceEnabled()) { 507 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 508 } 509 Thread.sleep(sleepTicks); 510 // reset throttler's cycle start tick when sleep for throttling occurs 511 throttler.resetStartTick(); 512 } 513 } 514 } 515 516 private void checkBandwidthChangeAndResetThrottler() { 517 long peerBandwidth = getCurrentBandwidth(); 518 if (peerBandwidth != currentBandwidth) { 519 currentBandwidth = peerBandwidth; 520 throttler.setBandwidth((double) currentBandwidth / 10.0); 521 LOG.info("ReplicationSource : " + peerId 522 + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); 523 } 524 } 525 526 private long getCurrentBandwidth() { 527 long peerBandwidth = replicationPeer.getPeerBandwidth(); 528 // User can set peer bandwidth to 0 to use default bandwidth. 529 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 530 } 531 532 /** 533 * Do the sleeping logic 534 * @param msg Why we sleep 535 * @param sleepMultiplier by how many times the default sleeping time is augmented 536 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 537 */ 538 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 539 try { 540 if (LOG.isTraceEnabled()) { 541 LOG.trace("{} {}, sleeping {} times {}", 542 logPeerId(), msg, sleepForRetries, sleepMultiplier); 543 } 544 Thread.sleep(this.sleepForRetries * sleepMultiplier); 545 } catch (InterruptedException e) { 546 if(LOG.isDebugEnabled()) { 547 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 548 } 549 Thread.currentThread().interrupt(); 550 } 551 return sleepMultiplier < maxRetriesMultiplier; 552 } 553 554 /** 555 * check whether the peer is enabled or not 556 * @return true if the peer is enabled, otherwise false 557 */ 558 @Override 559 public boolean isPeerEnabled() { 560 return replicationPeer.isPeerEnabled(); 561 } 562 563 private void initialize() { 564 int sleepMultiplier = 1; 565 while (this.isSourceActive()) { 566 ReplicationEndpoint replicationEndpoint; 567 try { 568 replicationEndpoint = createReplicationEndpoint(); 569 } catch (Exception e) { 570 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 571 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 572 sleepMultiplier++; 573 } 574 continue; 575 } 576 577 try { 578 initAndStartReplicationEndpoint(replicationEndpoint); 579 this.replicationEndpoint = replicationEndpoint; 580 break; 581 } catch (Exception e) { 582 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 583 replicationEndpoint.stop(); 584 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 585 sleepMultiplier++; 586 } else { 587 retryStartup.set(!this.abortOnError); 588 this.startupOngoing.set(false); 589 throw new RuntimeException("Exhausted retries to start replication endpoint."); 590 } 591 } 592 } 593 594 if (!this.isSourceActive()) { 595 retryStartup.set(!this.abortOnError); 596 this.startupOngoing.set(false); 597 throw new IllegalStateException("Source should be active."); 598 } 599 600 sleepMultiplier = 1; 601 UUID peerClusterId; 602 // delay this until we are in an asynchronous thread 603 for (;;) { 604 peerClusterId = replicationEndpoint.getPeerUUID(); 605 if (this.isSourceActive() && peerClusterId == null) { 606 if(LOG.isDebugEnabled()) { 607 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 608 (this.sleepForRetries * sleepMultiplier)); 609 } 610 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 611 sleepMultiplier++; 612 } 613 } else { 614 break; 615 } 616 } 617 618 if(!this.isSourceActive()) { 619 retryStartup.set(!this.abortOnError); 620 this.startupOngoing.set(false); 621 throw new IllegalStateException("Source should be active."); 622 } 623 LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", 624 logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId, 625 peerClusterId); 626 initializeWALEntryFilter(peerClusterId); 627 // Start workers 628 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { 629 String walGroupId = entry.getKey(); 630 PriorityBlockingQueue<Path> queue = entry.getValue(); 631 tryStartNewShipper(walGroupId, queue); 632 } 633 this.startupOngoing.set(false); 634 } 635 636 @Override 637 public ReplicationSourceInterface startup() { 638 if (this.sourceRunning) { 639 return this; 640 } 641 this.sourceRunning = true; 642 startupOngoing.set(true); 643 initThread = new Thread(this::initialize); 644 Threads.setDaemonThreadRunning(initThread, 645 Thread.currentThread().getName() + ".replicationSource," + this.queueId, 646 (t,e) -> { 647 //if first initialization attempt failed, and abortOnError is false, we will 648 //keep looping in this thread until initialize eventually succeeds, 649 //while the server main startup one can go on with its work. 650 sourceRunning = false; 651 uncaughtException(t, e, null, null); 652 retryStartup.set(!this.abortOnError); 653 do { 654 if(retryStartup.get()) { 655 this.sourceRunning = true; 656 startupOngoing.set(true); 657 retryStartup.set(false); 658 try { 659 initialize(); 660 } catch(Throwable error){ 661 sourceRunning = false; 662 uncaughtException(t, error, null, null); 663 retryStartup.set(!this.abortOnError); 664 } 665 } 666 } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); 667 }); 668 return this; 669 } 670 671 @Override 672 public void terminate(String reason) { 673 terminate(reason, null); 674 } 675 676 @Override 677 public void terminate(String reason, Exception cause) { 678 terminate(reason, cause, true); 679 } 680 681 @Override 682 public void terminate(String reason, Exception cause, boolean clearMetrics) { 683 terminate(reason, cause, clearMetrics, true); 684 } 685 686 public void terminate(String reason, Exception cause, boolean clearMetrics, 687 boolean join) { 688 if (cause == null) { 689 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 690 } else { 691 LOG.error("{} Closing source {} because an error occurred: {}", 692 logPeerId(), this.queueId, reason, cause); 693 } 694 this.sourceRunning = false; 695 if (initThread != null && Thread.currentThread() != initThread) { 696 // This usually won't happen but anyway, let's wait until the initialization thread exits. 697 // And notice that we may call terminate directly from the initThread so here we need to 698 // avoid join on ourselves. 699 initThread.interrupt(); 700 Threads.shutdown(initThread, this.sleepForRetries); 701 } 702 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 703 for (ReplicationSourceShipper worker : workers) { 704 worker.stopWorker(); 705 if(worker.entryReader != null) { 706 worker.entryReader.setReaderRunning(false); 707 } 708 } 709 710 if (this.replicationEndpoint != null) { 711 this.replicationEndpoint.stop(); 712 } 713 for (ReplicationSourceShipper worker : workers) { 714 if (worker.isAlive() || worker.entryReader.isAlive()) { 715 try { 716 // Wait worker to stop 717 Thread.sleep(this.sleepForRetries); 718 } catch (InterruptedException e) { 719 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 720 Thread.currentThread().interrupt(); 721 } 722 // If worker still is alive after waiting, interrupt it 723 if (worker.isAlive()) { 724 worker.interrupt(); 725 } 726 // If entry reader is alive after waiting, interrupt it 727 if (worker.entryReader.isAlive()) { 728 worker.entryReader.interrupt(); 729 } 730 } 731 } 732 733 if (join) { 734 for (ReplicationSourceShipper worker : workers) { 735 Threads.shutdown(worker, this.sleepForRetries); 736 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 737 } 738 if (this.replicationEndpoint != null) { 739 try { 740 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 741 TimeUnit.MILLISECONDS); 742 } catch (TimeoutException te) { 743 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 744 + "for replication source : {}", logPeerId(), this.queueId, te); 745 } 746 } 747 } 748 if (clearMetrics) { 749 // Can be null in test context. 750 if (this.metrics != null) { 751 this.metrics.clear(); 752 } 753 } 754 } 755 756 @Override 757 public String getQueueId() { 758 return this.queueId; 759 } 760 761 @Override 762 public String getPeerId() { 763 return this.peerId; 764 } 765 766 @Override 767 public Path getCurrentPath() { 768 // only for testing 769 for (ReplicationSourceShipper worker : workerThreads.values()) { 770 if (worker.getCurrentPath() != null) { 771 return worker.getCurrentPath(); 772 } 773 } 774 return null; 775 } 776 777 @Override 778 public boolean isSourceActive() { 779 return !this.server.isStopped() && this.sourceRunning; 780 } 781 782 public ReplicationQueueInfo getReplicationQueueInfo() { 783 return replicationQueueInfo; 784 } 785 786 public boolean isWorkerRunning(){ 787 for(ReplicationSourceShipper worker : this.workerThreads.values()){ 788 if(worker.isActive()){ 789 return worker.isActive(); 790 } 791 } 792 return false; 793 } 794 795 @Override 796 public String getStats() { 797 StringBuilder sb = new StringBuilder(); 798 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 799 .append(", current progress: \n"); 800 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 801 String walGroupId = entry.getKey(); 802 ReplicationSourceShipper worker = entry.getValue(); 803 long position = worker.getCurrentPosition(); 804 Path currentPath = worker.getCurrentPath(); 805 sb.append("walGroup [").append(walGroupId).append("]: "); 806 if (currentPath != null) { 807 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 808 .append(position).append("\n"); 809 } else { 810 sb.append("no replication ongoing, waiting for new log"); 811 } 812 } 813 return sb.toString(); 814 } 815 816 @Override 817 public MetricsSource getSourceMetrics() { 818 return this.metrics; 819 } 820 821 @Override 822 //offsets totalBufferUsed by deducting shipped batchSize. 823 public void postShipEdits(List<Entry> entries, int batchSize) { 824 if (throttler.isEnabled()) { 825 throttler.addPushSize(batchSize); 826 } 827 totalReplicatedEdits.addAndGet(entries.size()); 828 long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); 829 // Record the new buffer usage 830 this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); 831 } 832 833 @Override 834 public WALFileLengthProvider getWALFileLengthProvider() { 835 return walFileLengthProvider; 836 } 837 838 @Override 839 public ServerName getServerWALsBelongTo() { 840 return server.getServerName(); 841 } 842 843 Server getServer() { 844 return server; 845 } 846 847 @Override 848 public ReplicationQueueStorage getReplicationQueueStorage() { 849 return queueStorage; 850 } 851 852 /** 853 * @return String to use as a log prefix that contains current peerId. 854 */ 855 private String logPeerId(){ 856 return "peerId=" + this.getPeerId() + ","; 857 } 858}