001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; 021 022import com.google.errorprone.annotations.RestrictedApi; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.PriorityBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.function.Predicate; 040import org.apache.commons.lang3.StringUtils; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableDescriptors; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 057import org.apache.hadoop.hbase.replication.ReplicationPeer; 058import org.apache.hadoop.hbase.replication.ReplicationQueueData; 059import org.apache.hadoop.hbase.replication.ReplicationQueueId; 060import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 061import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 062import org.apache.hadoop.hbase.replication.WALEntryFilter; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.OOMEChecker; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.util.Threads; 067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 068import org.apache.hadoop.hbase.wal.WAL.Entry; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 074import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 075 076/** 077 * Class that handles the source of a replication stream. Currently does not handle more than 1 078 * slave cluster. For each slave cluster it selects a random number of peers using a replication 079 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will 080 * be selected. 081 * <p> 082 * A stream is considered down when we cannot contact a region server on the peer cluster for more 083 * than 55 seconds by default. 084 * </p> 085 */ 086@InterfaceAudience.Private 087public class ReplicationSource implements ReplicationSourceInterface { 088 089 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 090 // per group queue size, keep no more than this number of logs in each wal group 091 protected int queueSizePerGroup; 092 protected ReplicationSourceLogQueue logQueue; 093 protected ReplicationQueueStorage queueStorage; 094 protected ReplicationPeer replicationPeer; 095 096 protected Configuration conf; 097 098 // The manager of all sources to which we ping back our progress 099 protected ReplicationSourceManager manager; 100 // Should we stop everything? 101 protected Server server; 102 // How long should we sleep for each retry 103 private long sleepForRetries; 104 protected FileSystem fs; 105 // id of this cluster 106 private UUID clusterId; 107 // total number of edits we replicated 108 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 109 // The id of the replication queue 110 protected ReplicationQueueId queueId; 111 // The start offsets. Usually only recovered replication queue needs this, but probably when we 112 // update the peer config and restart the replication peer, we also need this? 113 protected ImmutableMap<String, ReplicationGroupOffset> startOffsets; 114 // Maximum number of retries before taking bold actions 115 private int maxRetriesMultiplier; 116 // Indicates if this particular source is running 117 volatile boolean sourceRunning = false; 118 // Metrics for this source 119 private MetricsSource metrics; 120 // ReplicationEndpoint which will handle the actual replication 121 private volatile ReplicationEndpoint replicationEndpoint; 122 123 private boolean abortOnError; 124 // This is needed for the startup loop to identify when there's already 125 // an initialization happening (but not finished yet), 126 // so that it doesn't try submit another initialize thread. 127 // NOTE: this should only be set to false at the end of initialize method, prior to return. 128 private AtomicBoolean startupOngoing = new AtomicBoolean(false); 129 // Flag that signalizes uncaught error happening while starting up the source 130 // and a retry should be attempted 131 private AtomicBoolean retryStartup = new AtomicBoolean(false); 132 133 /** 134 * A filter (or a chain of filters) for WAL entries; filters out edits. 135 */ 136 protected volatile WALEntryFilter walEntryFilter; 137 138 // throttler 139 private ReplicationThrottler throttler; 140 private long defaultBandwidth; 141 private long currentBandwidth; 142 private WALFileLengthProvider walFileLengthProvider; 143 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 144 new ConcurrentHashMap<>(); 145 146 public static final String WAIT_ON_ENDPOINT_SECONDS = 147 "hbase.replication.wait.on.endpoint.seconds"; 148 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 149 private int waitOnEndpointSeconds = -1; 150 151 private Thread initThread; 152 153 /** 154 * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to 155 * skip. 156 */ 157 private final Predicate<Path> filterInWALs; 158 159 /** 160 * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we 161 * do not want replicated, passed on to replication endpoints. This is the basic set. Down in 162 * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are 163 * put after those that we pick up from the configured endpoints and other machinations to create 164 * the final {@link #walEntryFilter}. 165 * @see WALEntryFilter 166 */ 167 private final List<WALEntryFilter> baseFilterOutWALEntries; 168 169 ReplicationSource() { 170 // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. 171 this(p -> !AbstractFSWALProvider.isMetaFile(p), 172 Lists.newArrayList(new SystemTableWALEntryFilter())); 173 } 174 175 /** 176 * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to 177 * Replicate; i.e. return 'true' if you want to replicate the 178 * content of the WAL. 179 * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* 180 * WALEntries so they never make it out of this ReplicationSource. 181 */ 182 ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) { 183 this.filterInWALs = replicateWAL; 184 this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); 185 } 186 187 /** 188 * Instantiation method used by region servers 189 * @param conf configuration to use 190 * @param fs file system to use 191 * @param manager replication manager to ping to 192 * @param server the server for this region server 193 * @param queueData the id and offsets of our replication queue 194 * @param clusterId unique UUID for the cluster 195 * @param metrics metrics for replication source 196 */ 197 @Override 198 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 199 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 200 ReplicationQueueData queueData, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 201 MetricsSource metrics) throws IOException { 202 this.server = server; 203 this.conf = HBaseConfiguration.create(conf); 204 this.waitOnEndpointSeconds = 205 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 206 decorateConf(); 207 // 1 second 208 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 209 // 5 minutes @ 1 sec per 210 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 211 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 212 this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); 213 this.queueStorage = queueStorage; 214 this.replicationPeer = replicationPeer; 215 this.manager = manager; 216 this.fs = fs; 217 this.metrics = metrics; 218 this.clusterId = clusterId; 219 220 this.queueId = queueData.getId(); 221 this.startOffsets = queueData.getOffsets(); 222 223 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. 224 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 225 currentBandwidth = getCurrentBandwidth(); 226 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 227 this.walFileLengthProvider = walFileLengthProvider; 228 229 this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); 230 231 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 232 replicationPeer.getId(), this.currentBandwidth); 233 } 234 235 private void decorateConf() { 236 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 237 if (StringUtils.isNotEmpty(replicationCodec)) { 238 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 239 } 240 } 241 242 @Override 243 public void enqueueLog(Path wal) { 244 if (!this.filterInWALs.test(wal)) { 245 LOG.trace("NOT replicating {}", wal); 246 return; 247 } 248 // Use WAL prefix as the WALGroupId for this peer. 249 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 250 boolean queueExists = logQueue.enqueueLog(wal, walGroupId); 251 252 if (!queueExists) { 253 if (this.isSourceActive() && this.walEntryFilter != null) { 254 // new wal group observed after source startup, start a new worker thread to track it 255 // notice: it's possible that wal enqueued when this.running is set but worker thread 256 // still not launched, so it's necessary to check workerThreads before start the worker 257 tryStartNewShipper(walGroupId); 258 } 259 } 260 if (LOG.isTraceEnabled()) { 261 LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walGroupId, queueId); 262 } 263 } 264 265 @RestrictedApi(explanation = "Should only be called in tests", link = "", 266 allowedOnPath = ".*/src/test/.*") 267 public Map<String, PriorityBlockingQueue<Path>> getQueues() { 268 return logQueue.getQueues(); 269 } 270 271 @Override 272 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 273 throws ReplicationException { 274 String peerId = replicationPeer.getId(); 275 if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { 276 this.queueStorage.addHFileRefs(peerId, pairs); 277 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 278 } else { 279 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 280 tableName, Bytes.toString(family), peerId); 281 } 282 } 283 284 private ReplicationEndpoint createReplicationEndpoint() 285 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 286 RegionServerCoprocessorHost rsServerHost = null; 287 if (server instanceof HRegionServer) { 288 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 289 } 290 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 291 292 ReplicationEndpoint replicationEndpoint; 293 if (replicationEndpointImpl == null) { 294 // Default to HBase inter-cluster replication endpoint; skip reflection 295 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 296 } else { 297 try { 298 replicationEndpoint = Class.forName(replicationEndpointImpl) 299 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 300 } catch (NoSuchMethodException | InvocationTargetException e) { 301 throw new IllegalArgumentException(e); 302 } 303 } 304 if (rsServerHost != null) { 305 ReplicationEndpoint newReplicationEndPoint = 306 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 307 if (newReplicationEndPoint != null) { 308 // Override the newly created endpoint from the hook with configured end point 309 replicationEndpoint = newReplicationEndPoint; 310 } 311 } 312 return replicationEndpoint; 313 } 314 315 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 316 throws IOException, TimeoutException { 317 TableDescriptors tableDescriptors = null; 318 if (server instanceof HRegionServer) { 319 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 320 } 321 replicationEndpoint 322 .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs, 323 replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); 324 replicationEndpoint.start(); 325 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 326 } 327 328 private void initializeWALEntryFilter(UUID peerClusterId) { 329 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 330 List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries); 331 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 332 if (filterFromEndpoint != null) { 333 filters.add(filterFromEndpoint); 334 } 335 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 336 this.walEntryFilter = new ChainWALEntryFilter(filters); 337 this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial()); 338 } 339 340 private long getStartOffset(String walGroupId) { 341 ReplicationGroupOffset startOffset = startOffsets.get(walGroupId); 342 if (startOffset == null || startOffset == ReplicationGroupOffset.BEGIN) { 343 return 0L; 344 } 345 // this method will only be called when start new shipper, and we will only start new shipper 346 // when there is a new queue, so here the queue for walGroupId will never be null. 347 Path first = logQueue.getQueue(walGroupId).peek(); 348 if (!startOffset.getWal().equals(first.getName())) { 349 return 0L; 350 } 351 // Usually, if we arrive here, the start offset should never be -1, as it means this file has 352 // been fully replicated so we should have filtered it out in upper layer, usually in 353 // ReplicationSourceManager. Add a warn message for safety, as usually replicate more data will 354 // not cause big problems. 355 if (startOffset.getOffset() < 0) { 356 LOG.warn("Should have already replicated wal {}, return start offset as 0", 357 startOffset.getWal()); 358 return 0L; 359 } else { 360 return startOffset.getOffset(); 361 } 362 } 363 364 protected final ReplicationSourceShipper createNewShipper(String walGroupId) { 365 ReplicationSourceWALReader walReader = 366 createNewWALReader(walGroupId, getStartOffset(walGroupId)); 367 ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader); 368 Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() 369 + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing); 370 return worker; 371 } 372 373 protected final void startShipper(ReplicationSourceShipper worker) { 374 worker.startup(this::retryRefreshing); 375 } 376 377 private void tryStartNewShipper(String walGroupId) { 378 workerThreads.compute(walGroupId, (key, value) -> { 379 if (value != null) { 380 LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); 381 return value; 382 } else { 383 LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); 384 ReplicationSourceShipper worker = createNewShipper(walGroupId); 385 startShipper(worker); 386 return worker; 387 } 388 }); 389 } 390 391 @Override 392 public Map<String, ReplicationStatus> getWalGroupStatus() { 393 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 394 long ageOfLastShippedOp, replicationDelay, fileSize; 395 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 396 String walGroupId = walGroupShipper.getKey(); 397 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 398 ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); 399 int queueSize = logQueue.getQueueSize(walGroupId); 400 replicationDelay = metrics.getReplicationDelay(); 401 Path currentPath = shipper.getCurrentPath(); 402 fileSize = -1; 403 if (currentPath != null) { 404 try { 405 fileSize = getFileSize(currentPath); 406 } catch (IOException e) { 407 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 408 } 409 } else { 410 currentPath = new Path("NO_LOGS_IN_QUEUE"); 411 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 412 } 413 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 414 statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId) 415 .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition()) 416 .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp) 417 .withReplicationDelay(replicationDelay); 418 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 419 } 420 return sourceReplicationStatus; 421 } 422 423 private long getFileSize(Path currentPath) throws IOException { 424 long fileSize; 425 try { 426 fileSize = fs.getContentSummary(currentPath).getLength(); 427 } catch (FileNotFoundException e) { 428 Path archivedLogPath = findArchivedLog(currentPath, conf); 429 // archivedLogPath can be null if unable to locate in archiveDir. 430 if (archivedLogPath == null) { 431 throw new FileNotFoundException("Couldn't find path: " + currentPath); 432 } 433 fileSize = fs.getContentSummary(archivedLogPath).getLength(); 434 } 435 return fileSize; 436 } 437 438 protected ReplicationSourceShipper createNewShipper(String walGroupId, 439 ReplicationSourceWALReader walReader) { 440 return new ReplicationSourceShipper(conf, walGroupId, this, walReader); 441 } 442 443 private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { 444 return replicationPeer.getPeerConfig().isSerial() 445 ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, 446 this, walGroupId) 447 : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this, 448 walGroupId); 449 } 450 451 /** 452 * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. 453 * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. 454 */ 455 WALEntryFilter getWalEntryFilter() { 456 return walEntryFilter; 457 } 458 459 // log the error, check if the error is OOME, or whether we should abort the server 460 private void checkError(Thread t, Throwable error) { 461 OOMEChecker.exitIfOOME(error, getClass().getSimpleName()); 462 LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error); 463 if (abortOnError) { 464 server.abort("Unexpected exception in " + t.getName(), error); 465 } 466 } 467 468 private void retryRefreshing(Thread t, Throwable error) { 469 checkError(t, error); 470 while (true) { 471 if (server.isAborted() || server.isStopped() || server.isStopping()) { 472 LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId()); 473 return; 474 } 475 try { 476 LOG.info("Refreshing replication sources now due to previous error on thread: {}", 477 t.getName()); 478 manager.refreshSources(getPeerId()); 479 break; 480 } catch (Exception e) { 481 LOG.error("Replication sources refresh failed.", e); 482 sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); 483 } 484 } 485 } 486 487 @Override 488 public ReplicationEndpoint getReplicationEndpoint() { 489 return this.replicationEndpoint; 490 } 491 492 @Override 493 public ReplicationSourceManager getSourceManager() { 494 return this.manager; 495 } 496 497 @Override 498 public void tryThrottle(int batchSize) throws InterruptedException { 499 checkBandwidthChangeAndResetThrottler(); 500 if (throttler.isEnabled()) { 501 long sleepTicks = throttler.getNextSleepInterval(batchSize); 502 if (sleepTicks > 0) { 503 if (LOG.isTraceEnabled()) { 504 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 505 } 506 Thread.sleep(sleepTicks); 507 // reset throttler's cycle start tick when sleep for throttling occurs 508 throttler.resetStartTick(); 509 } 510 } 511 } 512 513 private void checkBandwidthChangeAndResetThrottler() { 514 long peerBandwidth = getCurrentBandwidth(); 515 if (peerBandwidth != currentBandwidth) { 516 currentBandwidth = peerBandwidth; 517 throttler.setBandwidth((double) currentBandwidth / 10.0); 518 LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}", 519 replicationPeer.getId(), currentBandwidth); 520 } 521 } 522 523 private long getCurrentBandwidth() { 524 long peerBandwidth = replicationPeer.getPeerBandwidth(); 525 // User can set peer bandwidth to 0 to use default bandwidth. 526 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 527 } 528 529 /** 530 * Do the sleeping logic 531 * @param msg Why we sleep 532 * @param sleepMultiplier by how many times the default sleeping time is augmented 533 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 534 */ 535 private boolean sleepForRetries(String msg, int sleepMultiplier) { 536 try { 537 if (LOG.isTraceEnabled()) { 538 LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, 539 sleepMultiplier); 540 } 541 Thread.sleep(this.sleepForRetries * sleepMultiplier); 542 } catch (InterruptedException e) { 543 if (LOG.isDebugEnabled()) { 544 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 545 } 546 Thread.currentThread().interrupt(); 547 } 548 return sleepMultiplier < maxRetriesMultiplier; 549 } 550 551 private void initialize() { 552 int sleepMultiplier = 1; 553 while (this.isSourceActive()) { 554 ReplicationEndpoint replicationEndpoint; 555 try { 556 replicationEndpoint = createReplicationEndpoint(); 557 } catch (Exception e) { 558 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 559 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 560 sleepMultiplier++; 561 } 562 continue; 563 } 564 565 try { 566 initAndStartReplicationEndpoint(replicationEndpoint); 567 this.replicationEndpoint = replicationEndpoint; 568 break; 569 } catch (Exception e) { 570 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 571 replicationEndpoint.stop(); 572 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 573 sleepMultiplier++; 574 } else { 575 retryStartup.set(!this.abortOnError); 576 setSourceStartupStatus(false); 577 throw new RuntimeException("Exhausted retries to start replication endpoint."); 578 } 579 } 580 } 581 582 if (!this.isSourceActive()) { 583 // this means the server is shutting down or the source is terminated, just give up 584 // initializing 585 setSourceStartupStatus(false); 586 return; 587 } 588 589 sleepMultiplier = 1; 590 UUID peerClusterId; 591 // delay this until we are in an asynchronous thread 592 for (;;) { 593 peerClusterId = replicationEndpoint.getPeerUUID(); 594 if (this.isSourceActive() && peerClusterId == null) { 595 if (LOG.isDebugEnabled()) { 596 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 597 (this.sleepForRetries * sleepMultiplier)); 598 } 599 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 600 sleepMultiplier++; 601 } 602 } else { 603 break; 604 } 605 } 606 607 if (!this.isSourceActive()) { 608 // this means the server is shutting down or the source is terminated, just give up 609 // initializing 610 setSourceStartupStatus(false); 611 return; 612 } 613 614 LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(), 615 queueId, logQueue.getNumQueues(), clusterId, peerClusterId); 616 initializeWALEntryFilter(peerClusterId); 617 // Start workers 618 startShippers(); 619 setSourceStartupStatus(false); 620 } 621 622 protected void startShippers() { 623 for (String walGroupId : logQueue.getQueues().keySet()) { 624 tryStartNewShipper(walGroupId); 625 } 626 } 627 628 private synchronized void setSourceStartupStatus(boolean initializing) { 629 startupOngoing.set(initializing); 630 if (initializing) { 631 metrics.incrSourceInitializing(); 632 } else { 633 metrics.decrSourceInitializing(); 634 } 635 } 636 637 @Override 638 public ReplicationSourceInterface startup() { 639 if (this.sourceRunning) { 640 return this; 641 } 642 this.sourceRunning = true; 643 setSourceStartupStatus(true); 644 initThread = new Thread(this::initialize); 645 Threads.setDaemonThreadRunning(initThread, 646 Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> { 647 // if first initialization attempt failed, and abortOnError is false, we will 648 // keep looping in this thread until initialize eventually succeeds, 649 // while the server main startup one can go on with its work. 650 sourceRunning = false; 651 checkError(t, e); 652 retryStartup.set(!this.abortOnError); 653 do { 654 if (retryStartup.get()) { 655 this.sourceRunning = true; 656 setSourceStartupStatus(true); 657 retryStartup.set(false); 658 try { 659 initialize(); 660 } catch (Throwable error) { 661 setSourceStartupStatus(false); 662 checkError(t, error); 663 retryStartup.set(!this.abortOnError); 664 } 665 } 666 } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); 667 }); 668 return this; 669 } 670 671 @Override 672 public void terminate(String reason) { 673 terminate(reason, null); 674 } 675 676 @Override 677 public void terminate(String reason, Exception cause) { 678 terminate(reason, cause, true); 679 } 680 681 @Override 682 public void terminate(String reason, Exception cause, boolean clearMetrics) { 683 terminate(reason, cause, clearMetrics, true); 684 } 685 686 private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 687 if (cause == null) { 688 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 689 } else { 690 LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(), 691 this.queueId, reason), cause); 692 } 693 this.sourceRunning = false; 694 if (initThread != null && Thread.currentThread() != initThread) { 695 // This usually won't happen but anyway, let's wait until the initialization thread exits. 696 // And notice that we may call terminate directly from the initThread so here we need to 697 // avoid join on ourselves. 698 initThread.interrupt(); 699 Threads.shutdown(initThread, this.sleepForRetries); 700 } 701 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 702 703 for (ReplicationSourceShipper worker : workers) { 704 worker.stopWorker(); 705 worker.entryReader.setReaderRunning(false); 706 } 707 708 if (this.replicationEndpoint != null) { 709 this.replicationEndpoint.stop(); 710 } 711 712 for (ReplicationSourceShipper worker : workers) { 713 if (worker.isAlive() || worker.entryReader.isAlive()) { 714 try { 715 // Wait worker to stop 716 Thread.sleep(this.sleepForRetries); 717 } catch (InterruptedException e) { 718 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 719 Thread.currentThread().interrupt(); 720 } 721 // If worker still is alive after waiting, interrupt it 722 if (worker.isAlive()) { 723 worker.interrupt(); 724 } 725 // If entry reader is alive after waiting, interrupt it 726 if (worker.entryReader.isAlive()) { 727 worker.entryReader.interrupt(); 728 } 729 } 730 if (!server.isAborted() && !server.isStopped()) { 731 // If server is running and worker is already stopped but there was still entries batched, 732 // we need to clear buffer used for non processed entries 733 worker.clearWALEntryBatch(); 734 } 735 } 736 737 if (join) { 738 for (ReplicationSourceShipper worker : workers) { 739 Threads.shutdown(worker, this.sleepForRetries); 740 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 741 } 742 if (this.replicationEndpoint != null) { 743 try { 744 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 745 TimeUnit.MILLISECONDS); 746 } catch (TimeoutException te) { 747 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 748 + "for replication source : {}", logPeerId(), this.queueId, te); 749 } 750 } 751 } 752 753 // Can be null in test context. 754 if (this.metrics != null) { 755 if (clearMetrics) { 756 this.metrics.clear(); 757 } else { 758 this.metrics.terminate(); 759 } 760 } 761 } 762 763 @Override 764 public ReplicationQueueId getQueueId() { 765 return this.queueId; 766 } 767 768 @Override 769 public Path getCurrentPath() { 770 // only for testing 771 for (ReplicationSourceShipper worker : workerThreads.values()) { 772 if (worker.getCurrentPath() != null) { 773 return worker.getCurrentPath(); 774 } 775 } 776 return null; 777 } 778 779 @Override 780 public boolean isSourceActive() { 781 return !this.server.isStopped() && this.sourceRunning; 782 } 783 784 public boolean isWorkerRunning() { 785 for (ReplicationSourceShipper worker : this.workerThreads.values()) { 786 if (worker.isActive()) { 787 return worker.isActive(); 788 } 789 } 790 return false; 791 } 792 793 @Override 794 public String getStats() { 795 StringBuilder sb = new StringBuilder(); 796 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 797 .append(", current progress: \n"); 798 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 799 String walGroupId = entry.getKey(); 800 ReplicationSourceShipper worker = entry.getValue(); 801 long position = worker.getCurrentPosition(); 802 Path currentPath = worker.getCurrentPath(); 803 sb.append("walGroup [").append(walGroupId).append("]: "); 804 if (currentPath != null) { 805 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 806 .append(position).append("\n"); 807 } else { 808 sb.append("no replication ongoing, waiting for new log").append("\n"); 809 } 810 } 811 return sb.toString(); 812 } 813 814 @Override 815 public MetricsSource getSourceMetrics() { 816 return this.metrics; 817 } 818 819 @Override 820 // offsets totalBufferUsed by deducting shipped batchSize. 821 public void postShipEdits(List<Entry> entries, long batchSize) { 822 if (throttler.isEnabled()) { 823 throttler.addPushSize(batchSize); 824 } 825 totalReplicatedEdits.addAndGet(entries.size()); 826 this.manager.releaseBufferQuota(batchSize); 827 } 828 829 @Override 830 public WALFileLengthProvider getWALFileLengthProvider() { 831 return walFileLengthProvider; 832 } 833 834 @Override 835 public ServerName getServerWALsBelongTo() { 836 return queueId.getServerWALsBelongTo(); 837 } 838 839 @Override 840 public ReplicationPeer getPeer() { 841 return replicationPeer; 842 } 843 844 Server getServer() { 845 return server; 846 } 847 848 @Override 849 public ReplicationQueueStorage getReplicationQueueStorage() { 850 return queueStorage; 851 } 852 853 void removeWorker(ReplicationSourceShipper worker) { 854 workerThreads.remove(worker.walGroupId, worker); 855 } 856 857 public String logPeerId() { 858 return "peerId=" + this.getPeerId() + ","; 859 } 860 861 // Visible for testing purpose 862 public long getTotalReplicatedEdits() { 863 return totalReplicatedEdits.get(); 864 } 865}