001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; 021 022import com.google.errorprone.annotations.RestrictedApi; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.PriorityBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.function.Predicate; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableDescriptors; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 057import org.apache.hadoop.hbase.replication.ReplicationPeer; 058import org.apache.hadoop.hbase.replication.ReplicationQueueData; 059import org.apache.hadoop.hbase.replication.ReplicationQueueId; 060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 062import org.apache.hadoop.hbase.replication.WALEntryFilter; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.OOMEChecker; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.util.Threads; 067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 068import org.apache.hadoop.hbase.wal.WAL.Entry; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 074import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 075 076/** 077 * Class that handles the source of a replication stream. Currently does not handle more than 1 078 * slave cluster. For each slave cluster it selects a random number of peers using a replication 079 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will 080 * be selected. 081 * <p> 082 * A stream is considered down when we cannot contact a region server on the peer cluster for more 083 * than 55 seconds by default. 084 * </p> 085 */ 086@InterfaceAudience.Private 087public class ReplicationSource implements ReplicationSourceInterface { 088 089 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 090 // per group queue size, keep no more than this number of logs in each wal group 091 protected int queueSizePerGroup; 092 protected ReplicationSourceLogQueue logQueue; 093 protected ReplicationQueueStorage queueStorage; 094 protected ReplicationPeer replicationPeer; 095 096 protected Configuration conf; 097 098 // The manager of all sources to which we ping back our progress 099 protected ReplicationSourceManager manager; 100 // Should we stop everything? 101 protected Server server; 102 // How long should we sleep for each retry 103 private long sleepForRetries; 104 protected FileSystem fs; 105 // id of this cluster 106 private UUID clusterId; 107 // total number of edits we replicated 108 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 109 // The id of the replication queue 110 protected ReplicationQueueId queueId; 111 // The start offsets. Usually only recovered replication queue needs this, but probably when we 112 // update the peer config and restart the replication peer, we also need this? 113 protected ImmutableMap<String, ReplicationGroupOffset> startOffsets; 114 // Maximum number of retries before taking bold actions 115 private int maxRetriesMultiplier; 116 // Indicates if this particular source is running 117 volatile boolean sourceRunning = false; 118 // Metrics for this source 119 private MetricsSource metrics; 120 // ReplicationEndpoint which will handle the actual replication 121 private volatile ReplicationEndpoint replicationEndpoint; 122 123 private boolean abortOnError; 124 // This is needed for the startup loop to identify when there's already 125 // an initialization happening (but not finished yet), 126 // so that it doesn't try submit another initialize thread. 127 // NOTE: this should only be set to false at the end of initialize method, prior to return. 128 private AtomicBoolean startupOngoing = new AtomicBoolean(false); 129 // Flag that signalizes uncaught error happening while starting up the source 130 // and a retry should be attempted 131 private AtomicBoolean retryStartup = new AtomicBoolean(false); 132 133 /** 134 * A filter (or a chain of filters) for WAL entries; filters out edits. 135 */ 136 protected volatile WALEntryFilter walEntryFilter; 137 138 // throttler 139 private ReplicationThrottler throttler; 140 private long defaultBandwidth; 141 private long currentBandwidth; 142 private WALFileLengthProvider walFileLengthProvider; 143 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 144 new ConcurrentHashMap<>(); 145 146 public static final String WAIT_ON_ENDPOINT_SECONDS = 147 "hbase.replication.wait.on.endpoint.seconds"; 148 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 149 private int waitOnEndpointSeconds = -1; 150 151 private Thread initThread; 152 153 /** 154 * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to 155 * skip. 156 */ 157 private final Predicate<Path> filterInWALs; 158 159 /** 160 * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we 161 * do not want replicated, passed on to replication endpoints. This is the basic set. Down in 162 * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are 163 * put after those that we pick up from the configured endpoints and other machinations to create 164 * the final {@link #walEntryFilter}. 165 * @see WALEntryFilter 166 */ 167 private final List<WALEntryFilter> baseFilterOutWALEntries; 168 169 ReplicationSource() { 170 // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. 171 this(p -> !AbstractFSWALProvider.isMetaFile(p), 172 Lists.newArrayList(new SystemTableWALEntryFilter())); 173 } 174 175 /** 176 * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to 177 * Replicate; i.e. return 'true' if you want to replicate the 178 * content of the WAL. 179 * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* 180 * WALEntries so they never make it out of this ReplicationSource. 181 */ 182 ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) { 183 this.filterInWALs = replicateWAL; 184 this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); 185 } 186 187 /** 188 * Instantiation method used by region servers 189 * @param conf configuration to use 190 * @param fs file system to use 191 * @param manager replication manager to ping to 192 * @param server the server for this region server 193 * @param queueData the id and offsets of our replication queue 194 * @param clusterId unique UUID for the cluster 195 * @param metrics metrics for replication source 196 */ 197 @Override 198 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 199 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 200 ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 201 MetricsSource metrics) throws IOException { 202 this.server = server; 203 this.conf = HBaseConfiguration.create(conf); 204 this.waitOnEndpointSeconds = 205 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 206 decorateConf(); 207 // 1 second 208 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 209 // 5 minutes @ 1 sec per 210 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 211 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 212 this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); 213 this.queueStorage = queueStorage; 214 this.replicationPeer = replicationPeer; 215 this.manager = manager; 216 this.fs = fs; 217 this.metrics = metrics; 218 this.clusterId = clusterId; 219 220 this.queueId = queueData.getId(); 221 this.startOffsets = queueData.getOffsets(); 222 223 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. 224 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 225 currentBandwidth = getCurrentBandwidth(); 226 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 227 this.walFileLengthProvider = walFileLengthProvider; 228 229 this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); 230 231 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 232 replicationPeer.getId(), this.currentBandwidth); 233 } 234 235 private void decorateConf() { 236 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 237 if (StringUtils.isNotEmpty(replicationCodec)) { 238 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 239 } 240 } 241 242 @Override 243 public void enqueueLog(Path wal) { 244 if (!this.filterInWALs.test(wal)) { 245 LOG.trace("NOT replicating {}", wal); 246 return; 247 } 248 // Use WAL prefix as the WALGroupId for this peer. 249 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 250 boolean queueExists = logQueue.enqueueLog(wal, walGroupId); 251 252 if (!queueExists) { 253 if (this.isSourceActive() && this.walEntryFilter != null) { 254 // new wal group observed after source startup, start a new worker thread to track it 255 // notice: it's possible that wal enqueued when this.running is set but worker thread 256 // still not launched, so it's necessary to check workerThreads before start the worker 257 tryStartNewShipper(walGroupId); 258 } 259 } 260 if (LOG.isTraceEnabled()) { 261 LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walGroupId, queueId); 262 } 263 } 264 265 @RestrictedApi(explanation = "Should only be called in tests", link = "", 266 allowedOnPath = ".*/src/test/.*") 267 public Map<String, PriorityBlockingQueue<Path>> getQueues() { 268 return logQueue.getQueues(); 269 } 270 271 @Override 272 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 273 throws ReplicationException { 274 String peerId = replicationPeer.getId(); 275 if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { 276 this.queueStorage.addHFileRefs(peerId, pairs); 277 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 278 } else { 279 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 280 tableName, Bytes.toString(family), peerId); 281 } 282 } 283 284 private ReplicationEndpoint createReplicationEndpoint() 285 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 286 RegionServerCoprocessorHost rsServerHost = null; 287 if (server instanceof HRegionServer) { 288 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 289 } 290 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 291 292 ReplicationEndpoint replicationEndpoint; 293 if (replicationEndpointImpl == null) { 294 // Default to HBase inter-cluster replication endpoint; skip reflection 295 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 296 } else { 297 try { 298 replicationEndpoint = Class.forName(replicationEndpointImpl) 299 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 300 } catch (NoSuchMethodException | InvocationTargetException e) { 301 throw new IllegalArgumentException(e); 302 } 303 } 304 if (rsServerHost != null) { 305 ReplicationEndpoint newReplicationEndPoint = 306 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 307 if (newReplicationEndPoint != null) { 308 // Override the newly created endpoint from the hook with configured end point 309 replicationEndpoint = newReplicationEndPoint; 310 } 311 } 312 return replicationEndpoint; 313 } 314 315 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 316 throws IOException, TimeoutException { 317 TableDescriptors tableDescriptors = null; 318 if (server instanceof HRegionServer) { 319 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 320 } 321 replicationEndpoint 322 .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs, 323 replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); 324 replicationEndpoint.start(); 325 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 326 } 327 328 private void initializeWALEntryFilter(UUID peerClusterId) { 329 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 330 List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries); 331 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 332 if (filterFromEndpoint != null) { 333 filters.add(filterFromEndpoint); 334 } 335 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 336 this.walEntryFilter = new ChainWALEntryFilter(filters); 337 } 338 339 private long getStartOffset(String walGroupId) { 340 ReplicationGroupOffset startOffset = startOffsets.get(walGroupId); 341 if (startOffset == null || startOffset == ReplicationGroupOffset.BEGIN) { 342 return 0L; 343 } 344 // this method will only be called when start new shipper, and we will only start new shipper 345 // when there is a new queue, so here the queue for walGroupId will never be null. 346 Path first = logQueue.getQueue(walGroupId).peek(); 347 if (!startOffset.getWal().equals(first.getName())) { 348 return 0L; 349 } 350 // Usually, if we arrive here, the start offset should never be -1, as it means this file has 351 // been fully replicated so we should have filtered it out in upper layer, usually in 352 // ReplicationSourceManager. Add a warn message for safety, as usually replicate more data will 353 // not cause big problems. 354 if (startOffset.getOffset() < 0) { 355 LOG.warn("Should have already replicated wal {}, return start offset as 0", 356 startOffset.getWal()); 357 return 0L; 358 } else { 359 return startOffset.getOffset(); 360 } 361 } 362 363 protected final ReplicationSourceShipper createNewShipper(String walGroupId) { 364 ReplicationSourceWALReader walReader = 365 createNewWALReader(walGroupId, getStartOffset(walGroupId)); 366 ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader); 367 Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() 368 + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing); 369 return worker; 370 } 371 372 protected final void startShipper(ReplicationSourceShipper worker) { 373 worker.startup(this::retryRefreshing); 374 } 375 376 private void tryStartNewShipper(String walGroupId) { 377 workerThreads.compute(walGroupId, (key, value) -> { 378 if (value != null) { 379 LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); 380 return value; 381 } else { 382 LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); 383 ReplicationSourceShipper worker = createNewShipper(walGroupId); 384 startShipper(worker); 385 return worker; 386 } 387 }); 388 } 389 390 @Override 391 public Map<String, ReplicationStatus> getWalGroupStatus() { 392 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 393 long ageOfLastShippedOp, replicationDelay, fileSize; 394 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 395 String walGroupId = walGroupShipper.getKey(); 396 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 397 ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); 398 int queueSize = logQueue.getQueueSize(walGroupId); 399 replicationDelay = metrics.getReplicationDelay(); 400 Path currentPath = shipper.getCurrentPath(); 401 fileSize = -1; 402 if (currentPath != null) { 403 try { 404 fileSize = getFileSize(currentPath); 405 } catch (IOException e) { 406 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 407 } 408 } else { 409 currentPath = new Path("NO_LOGS_IN_QUEUE"); 410 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 411 } 412 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 413 statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId) 414 .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition()) 415 .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp) 416 .withReplicationDelay(replicationDelay); 417 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 418 } 419 return sourceReplicationStatus; 420 } 421 422 private long getFileSize(Path currentPath) throws IOException { 423 long fileSize; 424 try { 425 fileSize = fs.getContentSummary(currentPath).getLength(); 426 } catch (FileNotFoundException e) { 427 Path archivedLogPath = findArchivedLog(currentPath, conf); 428 // archivedLogPath can be null if unable to locate in archiveDir. 429 if (archivedLogPath == null) { 430 throw new FileNotFoundException("Couldn't find path: " + currentPath); 431 } 432 fileSize = fs.getContentSummary(archivedLogPath).getLength(); 433 } 434 return fileSize; 435 } 436 437 protected ReplicationSourceShipper createNewShipper(String walGroupId, 438 ReplicationSourceWALReader walReader) { 439 return new ReplicationSourceShipper(conf, walGroupId, this, walReader); 440 } 441 442 private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { 443 return replicationPeer.getPeerConfig().isSerial() 444 ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, 445 this, walGroupId) 446 : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this, 447 walGroupId); 448 } 449 450 /** 451 * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. 452 * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. 453 */ 454 WALEntryFilter getWalEntryFilter() { 455 return walEntryFilter; 456 } 457 458 // log the error, check if the error is OOME, or whether we should abort the server 459 private void checkError(Thread t, Throwable error) { 460 OOMEChecker.exitIfOOME(error, getClass().getSimpleName()); 461 LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error); 462 if (abortOnError) { 463 server.abort("Unexpected exception in " + t.getName(), error); 464 } 465 } 466 467 private void retryRefreshing(Thread t, Throwable error) { 468 checkError(t, error); 469 while (true) { 470 if (server.isAborted() || server.isStopped() || server.isStopping()) { 471 LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId()); 472 return; 473 } 474 try { 475 LOG.info("Refreshing replication sources now due to previous error on thread: {}", 476 t.getName()); 477 manager.refreshSources(getPeerId()); 478 break; 479 } catch (Exception e) { 480 LOG.error("Replication sources refresh failed.", e); 481 sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); 482 } 483 } 484 } 485 486 @Override 487 public ReplicationEndpoint getReplicationEndpoint() { 488 return this.replicationEndpoint; 489 } 490 491 @Override 492 public ReplicationSourceManager getSourceManager() { 493 return this.manager; 494 } 495 496 @Override 497 public void tryThrottle(int batchSize) throws InterruptedException { 498 checkBandwidthChangeAndResetThrottler(); 499 if (throttler.isEnabled()) { 500 long sleepTicks = throttler.getNextSleepInterval(batchSize); 501 if (sleepTicks > 0) { 502 if (LOG.isTraceEnabled()) { 503 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 504 } 505 Thread.sleep(sleepTicks); 506 // reset throttler's cycle start tick when sleep for throttling occurs 507 throttler.resetStartTick(); 508 } 509 } 510 } 511 512 private void checkBandwidthChangeAndResetThrottler() { 513 long peerBandwidth = getCurrentBandwidth(); 514 if (peerBandwidth != currentBandwidth) { 515 currentBandwidth = peerBandwidth; 516 throttler.setBandwidth((double) currentBandwidth / 10.0); 517 LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}", 518 replicationPeer.getId(), currentBandwidth); 519 } 520 } 521 522 private long getCurrentBandwidth() { 523 long peerBandwidth = replicationPeer.getPeerBandwidth(); 524 // User can set peer bandwidth to 0 to use default bandwidth. 525 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 526 } 527 528 /** 529 * Do the sleeping logic 530 * @param msg Why we sleep 531 * @param sleepMultiplier by how many times the default sleeping time is augmented 532 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 533 */ 534 private boolean sleepForRetries(String msg, int sleepMultiplier) { 535 try { 536 if (LOG.isTraceEnabled()) { 537 LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, 538 sleepMultiplier); 539 } 540 Thread.sleep(this.sleepForRetries * sleepMultiplier); 541 } catch (InterruptedException e) { 542 if (LOG.isDebugEnabled()) { 543 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 544 } 545 Thread.currentThread().interrupt(); 546 } 547 return sleepMultiplier < maxRetriesMultiplier; 548 } 549 550 private void initialize() { 551 int sleepMultiplier = 1; 552 while (this.isSourceActive()) { 553 ReplicationEndpoint replicationEndpoint; 554 try { 555 replicationEndpoint = createReplicationEndpoint(); 556 } catch (Exception e) { 557 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 558 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 559 sleepMultiplier++; 560 } 561 continue; 562 } 563 564 try { 565 initAndStartReplicationEndpoint(replicationEndpoint); 566 this.replicationEndpoint = replicationEndpoint; 567 break; 568 } catch (Exception e) { 569 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 570 replicationEndpoint.stop(); 571 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 572 sleepMultiplier++; 573 } else { 574 retryStartup.set(!this.abortOnError); 575 setSourceStartupStatus(false); 576 throw new RuntimeException("Exhausted retries to start replication endpoint."); 577 } 578 } 579 } 580 581 if (!this.isSourceActive()) { 582 // this means the server is shutting down or the source is terminated, just give up 583 // initializing 584 setSourceStartupStatus(false); 585 return; 586 } 587 588 sleepMultiplier = 1; 589 UUID peerClusterId; 590 // delay this until we are in an asynchronous thread 591 for (;;) { 592 peerClusterId = replicationEndpoint.getPeerUUID(); 593 if (this.isSourceActive() && peerClusterId == null) { 594 if (LOG.isDebugEnabled()) { 595 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 596 (this.sleepForRetries * sleepMultiplier)); 597 } 598 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 599 sleepMultiplier++; 600 } 601 } else { 602 break; 603 } 604 } 605 606 if (!this.isSourceActive()) { 607 // this means the server is shutting down or the source is terminated, just give up 608 // initializing 609 setSourceStartupStatus(false); 610 return; 611 } 612 613 LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(), 614 queueId, logQueue.getNumQueues(), clusterId, peerClusterId); 615 initializeWALEntryFilter(peerClusterId); 616 // Start workers 617 startShippers(); 618 setSourceStartupStatus(false); 619 } 620 621 protected void startShippers() { 622 for (String walGroupId : logQueue.getQueues().keySet()) { 623 tryStartNewShipper(walGroupId); 624 } 625 } 626 627 private synchronized void setSourceStartupStatus(boolean initializing) { 628 startupOngoing.set(initializing); 629 if (initializing) { 630 metrics.incrSourceInitializing(); 631 } else { 632 metrics.decrSourceInitializing(); 633 } 634 } 635 636 @Override 637 public ReplicationSourceInterface startup() { 638 if (this.sourceRunning) { 639 return this; 640 } 641 this.sourceRunning = true; 642 setSourceStartupStatus(true); 643 initThread = new Thread(this::initialize); 644 Threads.setDaemonThreadRunning(initThread, 645 Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> { 646 // if first initialization attempt failed, and abortOnError is false, we will 647 // keep looping in this thread until initialize eventually succeeds, 648 // while the server main startup one can go on with its work. 649 sourceRunning = false; 650 checkError(t, e); 651 retryStartup.set(!this.abortOnError); 652 do { 653 if (retryStartup.get()) { 654 this.sourceRunning = true; 655 setSourceStartupStatus(true); 656 retryStartup.set(false); 657 try { 658 initialize(); 659 } catch (Throwable error) { 660 setSourceStartupStatus(false); 661 checkError(t, error); 662 retryStartup.set(!this.abortOnError); 663 } 664 } 665 } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); 666 }); 667 return this; 668 } 669 670 @Override 671 public void terminate(String reason) { 672 terminate(reason, null); 673 } 674 675 @Override 676 public void terminate(String reason, Exception cause) { 677 terminate(reason, cause, true); 678 } 679 680 @Override 681 public void terminate(String reason, Exception cause, boolean clearMetrics) { 682 terminate(reason, cause, clearMetrics, true); 683 } 684 685 private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 686 if (cause == null) { 687 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 688 } else { 689 LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(), 690 this.queueId, reason), cause); 691 } 692 this.sourceRunning = false; 693 if (initThread != null && Thread.currentThread() != initThread) { 694 // This usually won't happen but anyway, let's wait until the initialization thread exits. 695 // And notice that we may call terminate directly from the initThread so here we need to 696 // avoid join on ourselves. 697 initThread.interrupt(); 698 Threads.shutdown(initThread, this.sleepForRetries); 699 } 700 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 701 702 for (ReplicationSourceShipper worker : workers) { 703 worker.stopWorker(); 704 worker.entryReader.setReaderRunning(false); 705 } 706 707 if (this.replicationEndpoint != null) { 708 this.replicationEndpoint.stop(); 709 } 710 711 for (ReplicationSourceShipper worker : workers) { 712 if (worker.isAlive() || worker.entryReader.isAlive()) { 713 try { 714 // Wait worker to stop 715 Thread.sleep(this.sleepForRetries); 716 } catch (InterruptedException e) { 717 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 718 Thread.currentThread().interrupt(); 719 } 720 // If worker still is alive after waiting, interrupt it 721 if (worker.isAlive()) { 722 worker.interrupt(); 723 } 724 // If entry reader is alive after waiting, interrupt it 725 if (worker.entryReader.isAlive()) { 726 worker.entryReader.interrupt(); 727 } 728 } 729 if (!server.isAborted() && !server.isStopped()) { 730 // If server is running and worker is already stopped but there was still entries batched, 731 // we need to clear buffer used for non processed entries 732 worker.clearWALEntryBatch(); 733 } 734 } 735 736 if (join) { 737 for (ReplicationSourceShipper worker : workers) { 738 Threads.shutdown(worker, this.sleepForRetries); 739 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 740 } 741 if (this.replicationEndpoint != null) { 742 try { 743 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 744 TimeUnit.MILLISECONDS); 745 } catch (TimeoutException te) { 746 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 747 + "for replication source : {}", logPeerId(), this.queueId, te); 748 } 749 } 750 } 751 752 // Can be null in test context. 753 if (this.metrics != null) { 754 if (clearMetrics) { 755 this.metrics.clear(); 756 } else { 757 this.metrics.terminate(); 758 } 759 } 760 } 761 762 @Override 763 public ReplicationQueueId getQueueId() { 764 return this.queueId; 765 } 766 767 @Override 768 public Path getCurrentPath() { 769 // only for testing 770 for (ReplicationSourceShipper worker : workerThreads.values()) { 771 if (worker.getCurrentPath() != null) { 772 return worker.getCurrentPath(); 773 } 774 } 775 return null; 776 } 777 778 @Override 779 public boolean isSourceActive() { 780 return !this.server.isStopped() && this.sourceRunning; 781 } 782 783 public boolean isWorkerRunning() { 784 for (ReplicationSourceShipper worker : this.workerThreads.values()) { 785 if (worker.isActive()) { 786 return worker.isActive(); 787 } 788 } 789 return false; 790 } 791 792 @Override 793 public String getStats() { 794 StringBuilder sb = new StringBuilder(); 795 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 796 .append(", current progress: \n"); 797 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 798 String walGroupId = entry.getKey(); 799 ReplicationSourceShipper worker = entry.getValue(); 800 long position = worker.getCurrentPosition(); 801 Path currentPath = worker.getCurrentPath(); 802 sb.append("walGroup [").append(walGroupId).append("]: "); 803 if (currentPath != null) { 804 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 805 .append(position).append("\n"); 806 } else { 807 sb.append("no replication ongoing, waiting for new log").append("\n"); 808 } 809 } 810 return sb.toString(); 811 } 812 813 @Override 814 public MetricsSource getSourceMetrics() { 815 return this.metrics; 816 } 817 818 @Override 819 // offsets totalBufferUsed by deducting shipped batchSize. 820 public void postShipEdits(List<Entry> entries, long batchSize) { 821 if (throttler.isEnabled()) { 822 throttler.addPushSize(batchSize); 823 } 824 totalReplicatedEdits.addAndGet(entries.size()); 825 this.manager.releaseBufferQuota(batchSize); 826 } 827 828 @Override 829 public WALFileLengthProvider getWALFileLengthProvider() { 830 return walFileLengthProvider; 831 } 832 833 @Override 834 public ServerName getServerWALsBelongTo() { 835 return queueId.getServerWALsBelongTo(); 836 } 837 838 @Override 839 public ReplicationPeer getPeer() { 840 return replicationPeer; 841 } 842 843 Server getServer() { 844 return server; 845 } 846 847 @Override 848 public ReplicationQueueStorage getReplicationQueueStorage() { 849 return queueStorage; 850 } 851 852 void removeWorker(ReplicationSourceShipper worker) { 853 workerThreads.remove(worker.walGroupId, worker); 854 } 855 856 public String logPeerId() { 857 return "peerId=" + this.getPeerId() + ","; 858 } 859 860 // Visible for testing purpose 861 public long getTotalReplicatedEdits() { 862 return totalReplicatedEdits.get(); 863 } 864}