001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeMap; 033import java.util.UUID; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.PriorityBlockingQueue; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicLong; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableDescriptors; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.regionserver.HRegionServer; 050import org.apache.hadoop.hbase.regionserver.RSRpcServices; 051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationPeer; 057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 071 072/** 073 * Class that handles the source of a replication stream. 074 * Currently does not handle more than 1 slave 075 * For each slave cluster it selects a random number of peers 076 * using a replication ratio. For example, if replication ration = 0.1 077 * and slave cluster has 100 region servers, 10 will be selected. 078 * <p> 079 * A stream is considered down when we cannot contact a region server on the 080 * peer cluster for more than 55 seconds by default. 081 * </p> 082 */ 083@InterfaceAudience.Private 084public class ReplicationSource implements ReplicationSourceInterface { 085 086 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 087 // Queues of logs to process, entry in format of walGroupId->queue, 088 // each presents a queue for one wal group 089 private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>(); 090 // per group queue size, keep no more than this number of logs in each wal group 091 protected int queueSizePerGroup; 092 protected ReplicationQueueStorage queueStorage; 093 protected ReplicationPeer replicationPeer; 094 095 protected Configuration conf; 096 protected ReplicationQueueInfo replicationQueueInfo; 097 // id of the peer cluster this source replicates to 098 private String peerId; 099 100 // The manager of all sources to which we ping back our progress 101 protected ReplicationSourceManager manager; 102 // Should we stop everything? 103 protected Server server; 104 // How long should we sleep for each retry 105 private long sleepForRetries; 106 protected FileSystem fs; 107 // id of this cluster 108 private UUID clusterId; 109 // total number of edits we replicated 110 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 111 // The znode we currently play with 112 protected String queueId; 113 // Maximum number of retries before taking bold actions 114 private int maxRetriesMultiplier; 115 // Indicates if this particular source is running 116 private volatile boolean sourceRunning = false; 117 // Metrics for this source 118 private MetricsSource metrics; 119 // WARN threshold for the number of queued logs, defaults to 2 120 private int logQueueWarnThreshold; 121 // ReplicationEndpoint which will handle the actual replication 122 private volatile ReplicationEndpoint replicationEndpoint; 123 // A filter (or a chain of filters) for the WAL entries. 124 protected volatile WALEntryFilter walEntryFilter; 125 // throttler 126 private ReplicationThrottler throttler; 127 private long defaultBandwidth; 128 private long currentBandwidth; 129 private WALFileLengthProvider walFileLengthProvider; 130 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 131 new ConcurrentHashMap<>(); 132 133 private AtomicLong totalBufferUsed; 134 135 public static final String WAIT_ON_ENDPOINT_SECONDS = 136 "hbase.replication.wait.on.endpoint.seconds"; 137 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 138 private int waitOnEndpointSeconds = -1; 139 140 private Thread initThread; 141 142 /** 143 * Instantiation method used by region servers 144 * @param conf configuration to use 145 * @param fs file system to use 146 * @param manager replication manager to ping to 147 * @param server the server for this region server 148 * @param queueId the id of our replication queue 149 * @param clusterId unique UUID for the cluster 150 * @param metrics metrics for replication source 151 */ 152 @Override 153 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 154 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 155 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 156 MetricsSource metrics) throws IOException { 157 this.server = server; 158 this.conf = HBaseConfiguration.create(conf); 159 this.waitOnEndpointSeconds = 160 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 161 decorateConf(); 162 this.sleepForRetries = 163 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 164 this.maxRetriesMultiplier = 165 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 166 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 167 this.queueStorage = queueStorage; 168 this.replicationPeer = replicationPeer; 169 this.manager = manager; 170 this.fs = fs; 171 this.metrics = metrics; 172 this.clusterId = clusterId; 173 174 this.queueId = queueId; 175 this.replicationQueueInfo = new ReplicationQueueInfo(queueId); 176 // ReplicationQueueInfo parses the peerId out of the znode for us 177 this.peerId = this.replicationQueueInfo.getPeerId(); 178 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); 179 180 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 181 currentBandwidth = getCurrentBandwidth(); 182 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 183 this.totalBufferUsed = manager.getTotalBufferUsed(); 184 this.walFileLengthProvider = walFileLengthProvider; 185 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 186 replicationPeer.getId(), this.currentBandwidth); 187 } 188 189 private void decorateConf() { 190 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 191 if (StringUtils.isNotEmpty(replicationCodec)) { 192 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 193 } 194 } 195 196 @Override 197 public void enqueueLog(Path log) { 198 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); 199 PriorityBlockingQueue<Path> queue = queues.get(logPrefix); 200 if (queue == null) { 201 queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); 202 queues.put(logPrefix, queue); 203 if (this.isSourceActive() && this.walEntryFilter != null) { 204 // new wal group observed after source startup, start a new worker thread to track it 205 // notice: it's possible that log enqueued when this.running is set but worker thread 206 // still not launched, so it's necessary to check workerThreads before start the worker 207 tryStartNewShipper(logPrefix, queue); 208 } 209 } 210 queue.put(log); 211 if (LOG.isTraceEnabled()) { 212 LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix, 213 this.replicationQueueInfo.getQueueId()); 214 } 215 this.metrics.incrSizeOfLogQueue(); 216 // This will log a warning for each new log that gets created above the warn threshold 217 int queueSize = queue.size(); 218 if (queueSize > this.logQueueWarnThreshold) { 219 LOG.warn("{} WAL group {} queue size: {} exceeds value of " 220 + "replication.source.log.queue.warn: {}", logPeerId(), 221 logPrefix, queueSize, logQueueWarnThreshold); 222 } 223 } 224 225 @Override 226 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 227 throws ReplicationException { 228 String peerId = replicationPeer.getId(); 229 Set<String> namespaces = replicationPeer.getNamespaces(); 230 Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); 231 if (tableCFMap != null) { // All peers with TableCFs 232 List<String> tableCfs = tableCFMap.get(tableName); 233 if (tableCFMap.containsKey(tableName) 234 && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { 235 this.queueStorage.addHFileRefs(peerId, pairs); 236 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 237 } else { 238 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 239 tableName, Bytes.toString(family), peerId); 240 } 241 } else if (namespaces != null) { // Only for set NAMESPACES peers 242 if (namespaces.contains(tableName.getNamespaceAsString())) { 243 this.queueStorage.addHFileRefs(peerId, pairs); 244 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 245 } else { 246 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 247 tableName, Bytes.toString(family), peerId); 248 } 249 } else { 250 // user has explicitly not defined any table cfs for replication, means replicate all the 251 // data 252 this.queueStorage.addHFileRefs(peerId, pairs); 253 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 254 } 255 } 256 257 private ReplicationEndpoint createReplicationEndpoint() 258 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 259 RegionServerCoprocessorHost rsServerHost = null; 260 if (server instanceof HRegionServer) { 261 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 262 } 263 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 264 265 ReplicationEndpoint replicationEndpoint; 266 if (replicationEndpointImpl == null) { 267 // Default to HBase inter-cluster replication endpoint; skip reflection 268 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 269 } else { 270 try { 271 replicationEndpoint = Class.forName(replicationEndpointImpl) 272 .asSubclass(ReplicationEndpoint.class) 273 .getDeclaredConstructor() 274 .newInstance(); 275 } catch (NoSuchMethodException | InvocationTargetException e) { 276 throw new IllegalArgumentException(e); 277 } 278 } 279 if (rsServerHost != null) { 280 ReplicationEndpoint newReplicationEndPoint = 281 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 282 if (newReplicationEndPoint != null) { 283 // Override the newly created endpoint from the hook with configured end point 284 replicationEndpoint = newReplicationEndPoint; 285 } 286 } 287 return replicationEndpoint; 288 } 289 290 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 291 throws IOException, TimeoutException { 292 TableDescriptors tableDescriptors = null; 293 if (server instanceof HRegionServer) { 294 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 295 } 296 replicationEndpoint 297 .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, 298 clusterId, replicationPeer, metrics, tableDescriptors, server)); 299 replicationEndpoint.start(); 300 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 301 } 302 303 private void initializeWALEntryFilter(UUID peerClusterId) { 304 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 305 ArrayList<WALEntryFilter> filters = 306 Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter()); 307 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 308 if (filterFromEndpoint != null) { 309 filters.add(filterFromEndpoint); 310 } 311 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 312 this.walEntryFilter = new ChainWALEntryFilter(filters); 313 } 314 315 private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { 316 ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); 317 ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); 318 if (extant != null) { 319 if(LOG.isDebugEnabled()) { 320 LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), 321 walGroupId); 322 } 323 } else { 324 if(LOG.isDebugEnabled()) { 325 LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); 326 } 327 ReplicationSourceWALReader walReader = 328 createNewWALReader(walGroupId, queue, worker.getStartPosition()); 329 Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + 330 ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); 331 worker.setWALReader(walReader); 332 worker.startup(this::uncaughtException); 333 } 334 } 335 336 @Override 337 public Map<String, ReplicationStatus> getWalGroupStatus() { 338 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 339 long ageOfLastShippedOp, replicationDelay, fileSize; 340 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 341 String walGroupId = walGroupShipper.getKey(); 342 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 343 ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); 344 int queueSize = queues.get(walGroupId).size(); 345 replicationDelay = metrics.getReplicationDelay(); 346 Path currentPath = shipper.getCurrentPath(); 347 fileSize = -1; 348 if (currentPath != null) { 349 try { 350 fileSize = getFileSize(currentPath); 351 } catch (IOException e) { 352 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 353 } 354 } else { 355 currentPath = new Path("NO_LOGS_IN_QUEUE"); 356 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 357 } 358 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 359 statusBuilder.withPeerId(this.getPeerId()) 360 .withQueueSize(queueSize) 361 .withWalGroup(walGroupId) 362 .withCurrentPath(currentPath) 363 .withCurrentPosition(shipper.getCurrentPosition()) 364 .withFileSize(fileSize) 365 .withAgeOfLastShippedOp(ageOfLastShippedOp) 366 .withReplicationDelay(replicationDelay); 367 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 368 } 369 return sourceReplicationStatus; 370 } 371 372 private long getFileSize(Path currentPath) throws IOException { 373 long fileSize; 374 try { 375 fileSize = fs.getContentSummary(currentPath).getLength(); 376 } catch (FileNotFoundException e) { 377 currentPath = getArchivedLogPath(currentPath, conf); 378 fileSize = fs.getContentSummary(currentPath).getLength(); 379 } 380 return fileSize; 381 } 382 383 protected ReplicationSourceShipper createNewShipper(String walGroupId, 384 PriorityBlockingQueue<Path> queue) { 385 return new ReplicationSourceShipper(conf, walGroupId, queue, this); 386 } 387 388 private ReplicationSourceWALReader createNewWALReader(String walGroupId, 389 PriorityBlockingQueue<Path> queue, long startPosition) { 390 return replicationPeer.getPeerConfig().isSerial() 391 ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) 392 : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); 393 } 394 395 protected final void uncaughtException(Thread t, Throwable e) { 396 RSRpcServices.exitIfOOME(e); 397 LOG.error("Unexpected exception in {} currentPath={}", 398 t.getName(), getCurrentPath(), e); 399 server.abort("Unexpected exception in " + t.getName(), e); 400 } 401 402 @Override 403 public ReplicationEndpoint getReplicationEndpoint() { 404 return this.replicationEndpoint; 405 } 406 407 @Override 408 public ReplicationSourceManager getSourceManager() { 409 return this.manager; 410 } 411 412 @Override 413 public void tryThrottle(int batchSize) throws InterruptedException { 414 checkBandwidthChangeAndResetThrottler(); 415 if (throttler.isEnabled()) { 416 long sleepTicks = throttler.getNextSleepInterval(batchSize); 417 if (sleepTicks > 0) { 418 if (LOG.isTraceEnabled()) { 419 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 420 } 421 Thread.sleep(sleepTicks); 422 // reset throttler's cycle start tick when sleep for throttling occurs 423 throttler.resetStartTick(); 424 } 425 } 426 } 427 428 private void checkBandwidthChangeAndResetThrottler() { 429 long peerBandwidth = getCurrentBandwidth(); 430 if (peerBandwidth != currentBandwidth) { 431 currentBandwidth = peerBandwidth; 432 throttler.setBandwidth((double) currentBandwidth / 10.0); 433 LOG.info("ReplicationSource : " + peerId 434 + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); 435 } 436 } 437 438 private long getCurrentBandwidth() { 439 long peerBandwidth = replicationPeer.getPeerBandwidth(); 440 // user can set peer bandwidth to 0 to use default bandwidth 441 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 442 } 443 444 /** 445 * Do the sleeping logic 446 * @param msg Why we sleep 447 * @param sleepMultiplier by how many times the default sleeping time is augmented 448 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 449 */ 450 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 451 try { 452 if (LOG.isTraceEnabled()) { 453 LOG.trace("{} {}, sleeping {} times {}", 454 logPeerId(), msg, sleepForRetries, sleepMultiplier); 455 } 456 Thread.sleep(this.sleepForRetries * sleepMultiplier); 457 } catch (InterruptedException e) { 458 if(LOG.isDebugEnabled()) { 459 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 460 } 461 Thread.currentThread().interrupt(); 462 } 463 return sleepMultiplier < maxRetriesMultiplier; 464 } 465 466 /** 467 * check whether the peer is enabled or not 468 * @return true if the peer is enabled, otherwise false 469 */ 470 @Override 471 public boolean isPeerEnabled() { 472 return replicationPeer.isPeerEnabled(); 473 } 474 475 private void initialize() { 476 int sleepMultiplier = 1; 477 while (this.isSourceActive()) { 478 ReplicationEndpoint replicationEndpoint; 479 try { 480 replicationEndpoint = createReplicationEndpoint(); 481 } catch (Exception e) { 482 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 483 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 484 sleepMultiplier++; 485 } 486 continue; 487 } 488 489 try { 490 initAndStartReplicationEndpoint(replicationEndpoint); 491 this.replicationEndpoint = replicationEndpoint; 492 break; 493 } catch (Exception e) { 494 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 495 replicationEndpoint.stop(); 496 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 497 sleepMultiplier++; 498 } 499 } 500 } 501 502 if (!this.isSourceActive()) { 503 return; 504 } 505 506 sleepMultiplier = 1; 507 UUID peerClusterId; 508 // delay this until we are in an asynchronous thread 509 for (;;) { 510 peerClusterId = replicationEndpoint.getPeerUUID(); 511 if (this.isSourceActive() && peerClusterId == null) { 512 if(LOG.isDebugEnabled()) { 513 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 514 (this.sleepForRetries * sleepMultiplier)); 515 } 516 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 517 sleepMultiplier++; 518 } 519 } else { 520 break; 521 } 522 } 523 524 if (!this.isSourceActive()) { 525 return; 526 } 527 528 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 529 // peerClusterId value, which is the same as the source clusterId 530 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { 531 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " 532 + peerClusterId + " which is not allowed by ReplicationEndpoint:" 533 + replicationEndpoint.getClass().getName(), null, false); 534 this.manager.removeSource(this); 535 return; 536 } 537 LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", 538 logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); 539 540 initializeWALEntryFilter(peerClusterId); 541 // start workers 542 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { 543 String walGroupId = entry.getKey(); 544 PriorityBlockingQueue<Path> queue = entry.getValue(); 545 tryStartNewShipper(walGroupId, queue); 546 } 547 } 548 549 @Override 550 public void startup() { 551 // mark we are running now 552 this.sourceRunning = true; 553 initThread = new Thread(this::initialize); 554 Threads.setDaemonThreadRunning(initThread, 555 Thread.currentThread().getName() + ".replicationSource," + this.queueId, 556 this::uncaughtException); 557 } 558 559 @Override 560 public void terminate(String reason) { 561 terminate(reason, null); 562 } 563 564 @Override 565 public void terminate(String reason, Exception cause) { 566 terminate(reason, cause, true); 567 } 568 569 @Override 570 public void terminate(String reason, Exception cause, boolean clearMetrics) { 571 terminate(reason, cause, clearMetrics, true); 572 } 573 574 public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 575 if (cause == null) { 576 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 577 } else { 578 LOG.error("{} Closing source {} because an error occurred: {}", 579 logPeerId(), this.queueId, reason, cause); 580 } 581 this.sourceRunning = false; 582 if (initThread != null && Thread.currentThread() != initThread) { 583 // This usually won't happen but anyway, let's wait until the initialization thread exits. 584 // And notice that we may call terminate directly from the initThread so here we need to 585 // avoid join on ourselves. 586 initThread.interrupt(); 587 Threads.shutdown(initThread, this.sleepForRetries); 588 } 589 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 590 for (ReplicationSourceShipper worker : workers) { 591 worker.stopWorker(); 592 if(worker.entryReader != null) { 593 worker.entryReader.setReaderRunning(false); 594 } 595 } 596 597 for (ReplicationSourceShipper worker : workers) { 598 if (worker.isAlive() || worker.entryReader.isAlive()) { 599 try { 600 // Wait worker to stop 601 Thread.sleep(this.sleepForRetries); 602 } catch (InterruptedException e) { 603 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 604 Thread.currentThread().interrupt(); 605 } 606 // If worker still is alive after waiting, interrupt it 607 if (worker.isAlive()) { 608 worker.interrupt(); 609 } 610 // If entry reader is alive after waiting, interrupt it 611 if (worker.entryReader.isAlive()) { 612 worker.entryReader.interrupt(); 613 } 614 } 615 } 616 617 if (this.replicationEndpoint != null) { 618 this.replicationEndpoint.stop(); 619 } 620 if (join) { 621 for (ReplicationSourceShipper worker : workers) { 622 Threads.shutdown(worker, this.sleepForRetries); 623 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 624 } 625 if (this.replicationEndpoint != null) { 626 try { 627 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 628 TimeUnit.MILLISECONDS); 629 } catch (TimeoutException te) { 630 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 631 + "for replication source : {}", logPeerId(), this.queueId, te); 632 } 633 } 634 } 635 if (clearMetrics) { 636 this.metrics.clear(); 637 } 638 } 639 640 @Override 641 public String getQueueId() { 642 return this.queueId; 643 } 644 645 @Override 646 public String getPeerId() { 647 return this.peerId; 648 } 649 650 @Override 651 public Path getCurrentPath() { 652 // only for testing 653 for (ReplicationSourceShipper worker : workerThreads.values()) { 654 if (worker.getCurrentPath() != null) { 655 return worker.getCurrentPath(); 656 } 657 } 658 return null; 659 } 660 661 @Override 662 public boolean isSourceActive() { 663 return !this.server.isStopped() && this.sourceRunning; 664 } 665 666 public UUID getPeerClusterUUID(){ 667 return this.clusterId; 668 } 669 670 /** 671 * Comparator used to compare logs together based on their start time 672 */ 673 public static class LogsComparator implements Comparator<Path> { 674 675 @Override 676 public int compare(Path o1, Path o2) { 677 return Long.compare(getTS(o1), getTS(o2)); 678 } 679 680 /** 681 * Split a path to get the start time 682 * For example: 10.20.20.171%3A60020.1277499063250 683 * @param p path to split 684 * @return start time 685 */ 686 private static long getTS(Path p) { 687 int tsIndex = p.getName().lastIndexOf('.') + 1; 688 return Long.parseLong(p.getName().substring(tsIndex)); 689 } 690 } 691 692 public ReplicationQueueInfo getReplicationQueueInfo() { 693 return replicationQueueInfo; 694 } 695 696 public boolean isWorkerRunning(){ 697 for(ReplicationSourceShipper worker : this.workerThreads.values()){ 698 if(worker.isActive()){ 699 return worker.isActive(); 700 } 701 } 702 return false; 703 } 704 705 @Override 706 public String getStats() { 707 StringBuilder sb = new StringBuilder(); 708 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 709 .append(", current progress: \n"); 710 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 711 String walGroupId = entry.getKey(); 712 ReplicationSourceShipper worker = entry.getValue(); 713 long position = worker.getCurrentPosition(); 714 Path currentPath = worker.getCurrentPath(); 715 sb.append("walGroup [").append(walGroupId).append("]: "); 716 if (currentPath != null) { 717 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 718 .append(position).append("\n"); 719 } else { 720 sb.append("no replication ongoing, waiting for new log"); 721 } 722 } 723 return sb.toString(); 724 } 725 726 @Override 727 public MetricsSource getSourceMetrics() { 728 return this.metrics; 729 } 730 731 @Override 732 //offsets totalBufferUsed by deducting shipped batchSize. 733 public void postShipEdits(List<Entry> entries, int batchSize) { 734 if (throttler.isEnabled()) { 735 throttler.addPushSize(batchSize); 736 } 737 totalReplicatedEdits.addAndGet(entries.size()); 738 totalBufferUsed.addAndGet(-batchSize); 739 } 740 741 @Override 742 public WALFileLengthProvider getWALFileLengthProvider() { 743 return walFileLengthProvider; 744 } 745 746 @Override 747 public ServerName getServerWALsBelongTo() { 748 return server.getServerName(); 749 } 750 751 Server getServer() { 752 return server; 753 } 754 755 ReplicationQueueStorage getQueueStorage() { 756 return queueStorage; 757 } 758 759 private String logPeerId(){ 760 return "[Source for peer " + this.getPeerId() + "]:"; 761 } 762}