001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.UUID; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.PriorityBlockingQueue; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.function.Predicate; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableDescriptors; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.regionserver.HRegionServer; 050import org.apache.hadoop.hbase.regionserver.RSRpcServices; 051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationPeer; 057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 071 072/** 073 * Class that handles the source of a replication stream. Currently does not handle more than 1 074 * slave cluster. For each slave cluster it selects a random number of peers using a replication 075 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will 076 * be selected. 077 * <p> 078 * A stream is considered down when we cannot contact a region server on the peer cluster for more 079 * than 55 seconds by default. 080 * </p> 081 */ 082@InterfaceAudience.Private 083public class ReplicationSource implements ReplicationSourceInterface { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 086 // per group queue size, keep no more than this number of logs in each wal group 087 protected int queueSizePerGroup; 088 protected ReplicationSourceLogQueue logQueue; 089 protected ReplicationQueueStorage queueStorage; 090 protected ReplicationPeer replicationPeer; 091 092 protected Configuration conf; 093 protected ReplicationQueueInfo replicationQueueInfo; 094 // id of the peer cluster this source replicates to 095 private String peerId; 096 097 // The manager of all sources to which we ping back our progress 098 protected ReplicationSourceManager manager; 099 // Should we stop everything? 100 protected Server server; 101 // How long should we sleep for each retry 102 private long sleepForRetries; 103 protected FileSystem fs; 104 // id of this cluster 105 private UUID clusterId; 106 // total number of edits we replicated 107 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 108 // The znode we currently play with 109 protected String queueId; 110 // Maximum number of retries before taking bold actions 111 private int maxRetriesMultiplier; 112 // Indicates if this particular source is running 113 volatile boolean sourceRunning = false; 114 // Metrics for this source 115 private MetricsSource metrics; 116 // ReplicationEndpoint which will handle the actual replication 117 private volatile ReplicationEndpoint replicationEndpoint; 118 119 private boolean abortOnError; 120 // This is needed for the startup loop to identify when there's already 121 // an initialization happening (but not finished yet), 122 // so that it doesn't try submit another initialize thread. 123 // NOTE: this should only be set to false at the end of initialize method, prior to return. 124 private AtomicBoolean startupOngoing = new AtomicBoolean(false); 125 // Flag that signalizes uncaught error happening while starting up the source 126 // and a retry should be attempted 127 private AtomicBoolean retryStartup = new AtomicBoolean(false); 128 129 /** 130 * A filter (or a chain of filters) for WAL entries; filters out edits. 131 */ 132 protected volatile WALEntryFilter walEntryFilter; 133 134 // throttler 135 private ReplicationThrottler throttler; 136 private long defaultBandwidth; 137 private long currentBandwidth; 138 private WALFileLengthProvider walFileLengthProvider; 139 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 140 new ConcurrentHashMap<>(); 141 142 private AtomicLong totalBufferUsed; 143 144 public static final String WAIT_ON_ENDPOINT_SECONDS = 145 "hbase.replication.wait.on.endpoint.seconds"; 146 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 147 private int waitOnEndpointSeconds = -1; 148 149 private Thread initThread; 150 151 /** 152 * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to 153 * skip. 154 */ 155 private final Predicate<Path> filterInWALs; 156 157 /** 158 * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we 159 * do not want replicated, passed on to replication endpoints. This is the basic set. Down in 160 * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are 161 * put after those that we pick up from the configured endpoints and other machinations to create 162 * the final {@link #walEntryFilter}. 163 * @see WALEntryFilter 164 */ 165 private final List<WALEntryFilter> baseFilterOutWALEntries; 166 167 ReplicationSource() { 168 // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. 169 this(p -> !AbstractFSWALProvider.isMetaFile(p), 170 Lists.newArrayList(new SystemTableWALEntryFilter())); 171 } 172 173 /** 174 * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to 175 * Replicate; i.e. return 'true' if you want to replicate the 176 * content of the WAL. 177 * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* 178 * WALEntries so they never make it out of this ReplicationSource. 179 */ 180 ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) { 181 this.filterInWALs = replicateWAL; 182 this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); 183 } 184 185 /** 186 * Instantiation method used by region servers 187 * @param conf configuration to use 188 * @param fs file system to use 189 * @param manager replication manager to ping to 190 * @param server the server for this region server 191 * @param queueId the id of our replication queue 192 * @param clusterId unique UUID for the cluster 193 * @param metrics metrics for replication source 194 */ 195 @Override 196 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 197 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 198 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 199 MetricsSource metrics) throws IOException { 200 this.server = server; 201 this.conf = HBaseConfiguration.create(conf); 202 this.waitOnEndpointSeconds = 203 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 204 decorateConf(); 205 // 1 second 206 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 207 // 5 minutes @ 1 sec per 208 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 209 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 210 this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); 211 this.queueStorage = queueStorage; 212 this.replicationPeer = replicationPeer; 213 this.manager = manager; 214 this.fs = fs; 215 this.metrics = metrics; 216 this.clusterId = clusterId; 217 218 this.queueId = queueId; 219 this.replicationQueueInfo = new ReplicationQueueInfo(queueId); 220 // ReplicationQueueInfo parses the peerId out of the znode for us 221 this.peerId = this.replicationQueueInfo.getPeerId(); 222 223 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. 224 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 225 currentBandwidth = getCurrentBandwidth(); 226 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 227 this.totalBufferUsed = manager.getTotalBufferUsed(); 228 this.walFileLengthProvider = walFileLengthProvider; 229 230 this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); 231 232 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 233 replicationPeer.getId(), this.currentBandwidth); 234 } 235 236 private void decorateConf() { 237 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 238 if (StringUtils.isNotEmpty(replicationCodec)) { 239 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 240 } 241 } 242 243 @Override 244 public void enqueueLog(Path wal) { 245 if (!this.filterInWALs.test(wal)) { 246 LOG.trace("NOT replicating {}", wal); 247 return; 248 } 249 // Use WAL prefix as the WALGroupId for this peer. 250 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 251 boolean queueExists = logQueue.enqueueLog(wal, walPrefix); 252 253 if (!queueExists) { 254 if (this.isSourceActive() && this.walEntryFilter != null) { 255 // new wal group observed after source startup, start a new worker thread to track it 256 // notice: it's possible that wal enqueued when this.running is set but worker thread 257 // still not launched, so it's necessary to check workerThreads before start the worker 258 tryStartNewShipper(walPrefix); 259 } 260 } 261 if (LOG.isTraceEnabled()) { 262 LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, 263 this.replicationQueueInfo.getQueueId()); 264 } 265 } 266 267 @InterfaceAudience.Private 268 public Map<String, PriorityBlockingQueue<Path>> getQueues() { 269 return logQueue.getQueues(); 270 } 271 272 @Override 273 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 274 throws ReplicationException { 275 String peerId = replicationPeer.getId(); 276 if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { 277 this.queueStorage.addHFileRefs(peerId, pairs); 278 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 279 } else { 280 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 281 tableName, Bytes.toString(family), peerId); 282 } 283 } 284 285 private ReplicationEndpoint createReplicationEndpoint() 286 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 287 RegionServerCoprocessorHost rsServerHost = null; 288 if (server instanceof HRegionServer) { 289 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 290 } 291 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 292 293 ReplicationEndpoint replicationEndpoint; 294 if (replicationEndpointImpl == null) { 295 // Default to HBase inter-cluster replication endpoint; skip reflection 296 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 297 } else { 298 try { 299 replicationEndpoint = Class.forName(replicationEndpointImpl) 300 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 301 } catch (NoSuchMethodException | InvocationTargetException e) { 302 throw new IllegalArgumentException(e); 303 } 304 } 305 if (rsServerHost != null) { 306 ReplicationEndpoint newReplicationEndPoint = 307 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 308 if (newReplicationEndPoint != null) { 309 // Override the newly created endpoint from the hook with configured end point 310 replicationEndpoint = newReplicationEndPoint; 311 } 312 } 313 return replicationEndpoint; 314 } 315 316 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 317 throws IOException, TimeoutException { 318 TableDescriptors tableDescriptors = null; 319 if (server instanceof HRegionServer) { 320 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 321 } 322 replicationEndpoint 323 .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, 324 clusterId, replicationPeer, metrics, tableDescriptors, server)); 325 replicationEndpoint.start(); 326 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 327 } 328 329 private void initializeWALEntryFilter(UUID peerClusterId) { 330 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 331 List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries); 332 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 333 if (filterFromEndpoint != null) { 334 filters.add(filterFromEndpoint); 335 } 336 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 337 this.walEntryFilter = new ChainWALEntryFilter(filters); 338 } 339 340 private void tryStartNewShipper(String walGroupId) { 341 workerThreads.compute(walGroupId, (key, value) -> { 342 if (value != null) { 343 LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); 344 return value; 345 } else { 346 LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); 347 ReplicationSourceShipper worker = createNewShipper(walGroupId); 348 ReplicationSourceWALReader walReader = 349 createNewWALReader(walGroupId, worker.getStartPosition()); 350 Threads.setDaemonThreadRunning( 351 walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader." 352 + walGroupId + "," + queueId, 353 (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); 354 worker.setWALReader(walReader); 355 worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId())); 356 return worker; 357 } 358 }); 359 } 360 361 @Override 362 public Map<String, ReplicationStatus> getWalGroupStatus() { 363 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 364 long ageOfLastShippedOp, replicationDelay, fileSize; 365 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 366 String walGroupId = walGroupShipper.getKey(); 367 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 368 ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); 369 int queueSize = logQueue.getQueueSize(walGroupId); 370 replicationDelay = metrics.getReplicationDelay(); 371 Path currentPath = shipper.getCurrentPath(); 372 fileSize = -1; 373 if (currentPath != null) { 374 try { 375 fileSize = getFileSize(currentPath); 376 } catch (IOException e) { 377 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 378 } 379 } else { 380 currentPath = new Path("NO_LOGS_IN_QUEUE"); 381 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 382 } 383 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 384 statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId) 385 .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition()) 386 .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp) 387 .withReplicationDelay(replicationDelay); 388 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 389 } 390 return sourceReplicationStatus; 391 } 392 393 private long getFileSize(Path currentPath) throws IOException { 394 long fileSize; 395 try { 396 fileSize = fs.getContentSummary(currentPath).getLength(); 397 } catch (FileNotFoundException e) { 398 Path archivedLogPath = findArchivedLog(currentPath, conf); 399 // archivedLogPath can be null if unable to locate in archiveDir. 400 if (archivedLogPath == null) { 401 throw new FileNotFoundException("Couldn't find path: " + currentPath); 402 } 403 fileSize = fs.getContentSummary(archivedLogPath).getLength(); 404 } 405 return fileSize; 406 } 407 408 protected ReplicationSourceShipper createNewShipper(String walGroupId) { 409 return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); 410 } 411 412 private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { 413 return replicationPeer.getPeerConfig().isSerial() 414 ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, 415 this, walGroupId) 416 : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this, 417 walGroupId); 418 } 419 420 /** 421 * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. 422 * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. 423 */ 424 WALEntryFilter getWalEntryFilter() { 425 return walEntryFilter; 426 } 427 428 protected final void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager, 429 String peerId) { 430 RSRpcServices.exitIfOOME(e); 431 LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e); 432 if (abortOnError) { 433 server.abort("Unexpected exception in " + t.getName(), e); 434 } 435 if (manager != null) { 436 while (true) { 437 try { 438 LOG.info("Refreshing replication sources now due to previous error on thread: {}", 439 t.getName()); 440 manager.refreshSources(peerId); 441 break; 442 } catch (IOException e1) { 443 LOG.error("Replication sources refresh failed.", e1); 444 sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); 445 } 446 } 447 } 448 } 449 450 @Override 451 public ReplicationEndpoint getReplicationEndpoint() { 452 return this.replicationEndpoint; 453 } 454 455 @Override 456 public ReplicationSourceManager getSourceManager() { 457 return this.manager; 458 } 459 460 @Override 461 public void tryThrottle(int batchSize) throws InterruptedException { 462 checkBandwidthChangeAndResetThrottler(); 463 if (throttler.isEnabled()) { 464 long sleepTicks = throttler.getNextSleepInterval(batchSize); 465 if (sleepTicks > 0) { 466 if (LOG.isTraceEnabled()) { 467 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 468 } 469 Thread.sleep(sleepTicks); 470 // reset throttler's cycle start tick when sleep for throttling occurs 471 throttler.resetStartTick(); 472 } 473 } 474 } 475 476 private void checkBandwidthChangeAndResetThrottler() { 477 long peerBandwidth = getCurrentBandwidth(); 478 if (peerBandwidth != currentBandwidth) { 479 currentBandwidth = peerBandwidth; 480 throttler.setBandwidth((double) currentBandwidth / 10.0); 481 LOG.info("ReplicationSource : " + peerId + " bandwidth throttling changed, currentBandWidth=" 482 + currentBandwidth); 483 } 484 } 485 486 private long getCurrentBandwidth() { 487 long peerBandwidth = replicationPeer.getPeerBandwidth(); 488 // User can set peer bandwidth to 0 to use default bandwidth. 489 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 490 } 491 492 /** 493 * Do the sleeping logic 494 * @param msg Why we sleep 495 * @param sleepMultiplier by how many times the default sleeping time is augmented 496 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 497 */ 498 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 499 try { 500 if (LOG.isTraceEnabled()) { 501 LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, 502 sleepMultiplier); 503 } 504 Thread.sleep(this.sleepForRetries * sleepMultiplier); 505 } catch (InterruptedException e) { 506 if (LOG.isDebugEnabled()) { 507 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 508 } 509 Thread.currentThread().interrupt(); 510 } 511 return sleepMultiplier < maxRetriesMultiplier; 512 } 513 514 /** 515 * check whether the peer is enabled or not 516 * @return true if the peer is enabled, otherwise false 517 */ 518 @Override 519 public boolean isPeerEnabled() { 520 return replicationPeer.isPeerEnabled(); 521 } 522 523 private void initialize() { 524 int sleepMultiplier = 1; 525 while (this.isSourceActive()) { 526 ReplicationEndpoint replicationEndpoint; 527 try { 528 replicationEndpoint = createReplicationEndpoint(); 529 } catch (Exception e) { 530 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 531 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 532 sleepMultiplier++; 533 } 534 continue; 535 } 536 537 try { 538 initAndStartReplicationEndpoint(replicationEndpoint); 539 this.replicationEndpoint = replicationEndpoint; 540 break; 541 } catch (Exception e) { 542 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 543 replicationEndpoint.stop(); 544 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 545 sleepMultiplier++; 546 } else { 547 retryStartup.set(!this.abortOnError); 548 setSourceStartupStatus(false); 549 throw new RuntimeException("Exhausted retries to start replication endpoint."); 550 } 551 } 552 } 553 554 if (!this.isSourceActive()) { 555 setSourceStartupStatus(false); 556 if (Thread.currentThread().isInterrupted()) { 557 // If source is not running and thread is interrupted this means someone has tried to 558 // remove this peer. 559 return; 560 } 561 562 retryStartup.set(!this.abortOnError); 563 throw new IllegalStateException("Source should be active."); 564 } 565 566 sleepMultiplier = 1; 567 UUID peerClusterId; 568 // delay this until we are in an asynchronous thread 569 for (;;) { 570 peerClusterId = replicationEndpoint.getPeerUUID(); 571 if (this.isSourceActive() && peerClusterId == null) { 572 if (LOG.isDebugEnabled()) { 573 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 574 (this.sleepForRetries * sleepMultiplier)); 575 } 576 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 577 sleepMultiplier++; 578 } 579 } else { 580 break; 581 } 582 } 583 584 if (!this.isSourceActive()) { 585 setSourceStartupStatus(false); 586 if (Thread.currentThread().isInterrupted()) { 587 // If source is not running and thread is interrupted this means someone has tried to 588 // remove this peer. 589 return; 590 } 591 retryStartup.set(!this.abortOnError); 592 throw new IllegalStateException("Source should be active."); 593 } 594 LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(), 595 this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId); 596 initializeWALEntryFilter(peerClusterId); 597 // Start workers 598 for (String walGroupId : logQueue.getQueues().keySet()) { 599 tryStartNewShipper(walGroupId); 600 } 601 setSourceStartupStatus(false); 602 } 603 604 private synchronized void setSourceStartupStatus(boolean initializing) { 605 startupOngoing.set(initializing); 606 if (initializing) { 607 metrics.incrSourceInitializing(); 608 } else { 609 metrics.decrSourceInitializing(); 610 } 611 } 612 613 @Override 614 public ReplicationSourceInterface startup() { 615 if (this.sourceRunning) { 616 return this; 617 } 618 this.sourceRunning = true; 619 setSourceStartupStatus(true); 620 initThread = new Thread(this::initialize); 621 Threads.setDaemonThreadRunning(initThread, 622 Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> { 623 // if first initialization attempt failed, and abortOnError is false, we will 624 // keep looping in this thread until initialize eventually succeeds, 625 // while the server main startup one can go on with its work. 626 sourceRunning = false; 627 uncaughtException(t, e, null, null); 628 retryStartup.set(!this.abortOnError); 629 do { 630 if (retryStartup.get()) { 631 this.sourceRunning = true; 632 setSourceStartupStatus(true); 633 retryStartup.set(false); 634 try { 635 initialize(); 636 } catch (Throwable error) { 637 setSourceStartupStatus(false); 638 uncaughtException(t, error, null, null); 639 retryStartup.set(!this.abortOnError); 640 } 641 } 642 } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); 643 }); 644 return this; 645 } 646 647 @Override 648 public void terminate(String reason) { 649 terminate(reason, null); 650 } 651 652 @Override 653 public void terminate(String reason, Exception cause) { 654 terminate(reason, cause, true); 655 } 656 657 @Override 658 public void terminate(String reason, Exception cause, boolean clearMetrics) { 659 terminate(reason, cause, clearMetrics, true); 660 } 661 662 public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 663 if (cause == null) { 664 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 665 } else { 666 LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(), 667 this.queueId, reason), cause); 668 } 669 this.sourceRunning = false; 670 if (initThread != null && Thread.currentThread() != initThread) { 671 // This usually won't happen but anyway, let's wait until the initialization thread exits. 672 // And notice that we may call terminate directly from the initThread so here we need to 673 // avoid join on ourselves. 674 initThread.interrupt(); 675 Threads.shutdown(initThread, this.sleepForRetries); 676 } 677 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 678 679 for (ReplicationSourceShipper worker : workers) { 680 worker.stopWorker(); 681 if (worker.entryReader != null) { 682 worker.entryReader.setReaderRunning(false); 683 } 684 } 685 686 if (this.replicationEndpoint != null) { 687 this.replicationEndpoint.stop(); 688 } 689 690 for (ReplicationSourceShipper worker : workers) { 691 if (worker.isAlive() || worker.entryReader.isAlive()) { 692 try { 693 // Wait worker to stop 694 Thread.sleep(this.sleepForRetries); 695 } catch (InterruptedException e) { 696 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 697 Thread.currentThread().interrupt(); 698 } 699 // If worker still is alive after waiting, interrupt it 700 if (worker.isAlive()) { 701 worker.interrupt(); 702 } 703 // If entry reader is alive after waiting, interrupt it 704 if (worker.entryReader.isAlive()) { 705 worker.entryReader.interrupt(); 706 } 707 } 708 if (!server.isAborted() && !server.isStopped()) { 709 // If server is running and worker is already stopped but there was still entries batched, 710 // we need to clear buffer used for non processed entries 711 worker.clearWALEntryBatch(); 712 } 713 } 714 715 if (join) { 716 for (ReplicationSourceShipper worker : workers) { 717 Threads.shutdown(worker, this.sleepForRetries); 718 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 719 } 720 if (this.replicationEndpoint != null) { 721 try { 722 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 723 TimeUnit.MILLISECONDS); 724 } catch (TimeoutException te) { 725 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 726 + "for replication source : {}", logPeerId(), this.queueId, te); 727 } 728 } 729 } 730 if (clearMetrics) { 731 // Can be null in test context. 732 if (this.metrics != null) { 733 this.metrics.clear(); 734 } 735 } 736 } 737 738 @Override 739 public String getQueueId() { 740 return this.queueId; 741 } 742 743 @Override 744 public String getPeerId() { 745 return this.peerId; 746 } 747 748 @Override 749 public Path getCurrentPath() { 750 // only for testing 751 for (ReplicationSourceShipper worker : workerThreads.values()) { 752 if (worker.getCurrentPath() != null) { 753 return worker.getCurrentPath(); 754 } 755 } 756 return null; 757 } 758 759 @Override 760 public boolean isSourceActive() { 761 return !this.server.isStopped() && this.sourceRunning; 762 } 763 764 public ReplicationQueueInfo getReplicationQueueInfo() { 765 return replicationQueueInfo; 766 } 767 768 public boolean isWorkerRunning() { 769 for (ReplicationSourceShipper worker : this.workerThreads.values()) { 770 if (worker.isActive()) { 771 return worker.isActive(); 772 } 773 } 774 return false; 775 } 776 777 @Override 778 public String getStats() { 779 StringBuilder sb = new StringBuilder(); 780 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 781 .append(", current progress: \n"); 782 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 783 String walGroupId = entry.getKey(); 784 ReplicationSourceShipper worker = entry.getValue(); 785 long position = worker.getCurrentPosition(); 786 Path currentPath = worker.getCurrentPath(); 787 sb.append("walGroup [").append(walGroupId).append("]: "); 788 if (currentPath != null) { 789 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 790 .append(position).append("\n"); 791 } else { 792 sb.append("no replication ongoing, waiting for new log"); 793 } 794 } 795 return sb.toString(); 796 } 797 798 @Override 799 public MetricsSource getSourceMetrics() { 800 return this.metrics; 801 } 802 803 @Override 804 // offsets totalBufferUsed by deducting shipped batchSize. 805 public void postShipEdits(List<Entry> entries, int batchSize) { 806 if (throttler.isEnabled()) { 807 throttler.addPushSize(batchSize); 808 } 809 totalReplicatedEdits.addAndGet(entries.size()); 810 long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); 811 // Record the new buffer usage 812 this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); 813 } 814 815 @Override 816 public WALFileLengthProvider getWALFileLengthProvider() { 817 return walFileLengthProvider; 818 } 819 820 @Override 821 public ServerName getServerWALsBelongTo() { 822 return server.getServerName(); 823 } 824 825 Server getServer() { 826 return server; 827 } 828 829 @Override 830 public ReplicationQueueStorage getReplicationQueueStorage() { 831 return queueStorage; 832 } 833 834 /** Returns String to use as a log prefix that contains current peerId. */ 835 public String logPeerId() { 836 return "peerId=" + this.getPeerId() + ","; 837 } 838}