001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.PriorityBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicLong; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.Server; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableDescriptors; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RSRpcServices; 050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 051import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 052import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 053import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 054import org.apache.hadoop.hbase.replication.ReplicationException; 055import org.apache.hadoop.hbase.replication.ReplicationPeer; 056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 058import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 059import org.apache.hadoop.hbase.replication.WALEntryFilter; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.util.Threads; 063import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 064import org.apache.hadoop.hbase.wal.WAL.Entry; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 070 071/** 072 * Class that handles the source of a replication stream. 073 * Currently does not handle more than 1 slave 074 * For each slave cluster it selects a random number of peers 075 * using a replication ratio. For example, if replication ration = 0.1 076 * and slave cluster has 100 region servers, 10 will be selected. 077 * <p> 078 * A stream is considered down when we cannot contact a region server on the 079 * peer cluster for more than 55 seconds by default. 080 * </p> 081 */ 082@InterfaceAudience.Private 083public class ReplicationSource implements ReplicationSourceInterface { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 086 // Queues of logs to process, entry in format of walGroupId->queue, 087 // each presents a queue for one wal group 088 private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>(); 089 // per group queue size, keep no more than this number of logs in each wal group 090 protected int queueSizePerGroup; 091 protected ReplicationQueueStorage queueStorage; 092 protected ReplicationPeer replicationPeer; 093 094 protected Configuration conf; 095 protected ReplicationQueueInfo replicationQueueInfo; 096 // id of the peer cluster this source replicates to 097 private String peerId; 098 099 // The manager of all sources to which we ping back our progress 100 protected ReplicationSourceManager manager; 101 // Should we stop everything? 102 protected Server server; 103 // How long should we sleep for each retry 104 private long sleepForRetries; 105 protected FileSystem fs; 106 // id of this cluster 107 private UUID clusterId; 108 // total number of edits we replicated 109 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 110 // The znode we currently play with 111 protected String queueId; 112 // Maximum number of retries before taking bold actions 113 private int maxRetriesMultiplier; 114 // Indicates if this particular source is running 115 private volatile boolean sourceRunning = false; 116 // Metrics for this source 117 private MetricsSource metrics; 118 // WARN threshold for the number of queued logs, defaults to 2 119 private int logQueueWarnThreshold; 120 // ReplicationEndpoint which will handle the actual replication 121 private volatile ReplicationEndpoint replicationEndpoint; 122 // A filter (or a chain of filters) for the WAL entries. 123 protected volatile WALEntryFilter walEntryFilter; 124 // throttler 125 private ReplicationThrottler throttler; 126 private long defaultBandwidth; 127 private long currentBandwidth; 128 private WALFileLengthProvider walFileLengthProvider; 129 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 130 new ConcurrentHashMap<>(); 131 132 private AtomicLong totalBufferUsed; 133 134 public static final String WAIT_ON_ENDPOINT_SECONDS = 135 "hbase.replication.wait.on.endpoint.seconds"; 136 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 137 private int waitOnEndpointSeconds = -1; 138 139 private Thread initThread; 140 141 /** 142 * Instantiation method used by region servers 143 * @param conf configuration to use 144 * @param fs file system to use 145 * @param manager replication manager to ping to 146 * @param server the server for this region server 147 * @param queueId the id of our replication queue 148 * @param clusterId unique UUID for the cluster 149 * @param metrics metrics for replication source 150 */ 151 @Override 152 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 153 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 154 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 155 MetricsSource metrics) throws IOException { 156 this.server = server; 157 this.conf = HBaseConfiguration.create(conf); 158 this.waitOnEndpointSeconds = 159 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 160 decorateConf(); 161 this.sleepForRetries = 162 this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second 163 this.maxRetriesMultiplier = 164 this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per 165 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 166 this.queueStorage = queueStorage; 167 this.replicationPeer = replicationPeer; 168 this.manager = manager; 169 this.fs = fs; 170 this.metrics = metrics; 171 this.clusterId = clusterId; 172 173 this.queueId = queueId; 174 this.replicationQueueInfo = new ReplicationQueueInfo(queueId); 175 // ReplicationQueueInfo parses the peerId out of the znode for us 176 this.peerId = this.replicationQueueInfo.getPeerId(); 177 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); 178 179 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 180 currentBandwidth = getCurrentBandwidth(); 181 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 182 this.totalBufferUsed = manager.getTotalBufferUsed(); 183 this.walFileLengthProvider = walFileLengthProvider; 184 LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId 185 + ", currentBandwidth=" + this.currentBandwidth); 186 } 187 188 private void decorateConf() { 189 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 190 if (StringUtils.isNotEmpty(replicationCodec)) { 191 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 192 } 193 } 194 195 @Override 196 public void enqueueLog(Path log) { 197 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); 198 PriorityBlockingQueue<Path> queue = queues.get(logPrefix); 199 if (queue == null) { 200 queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); 201 queues.put(logPrefix, queue); 202 if (this.isSourceActive() && this.walEntryFilter != null) { 203 // new wal group observed after source startup, start a new worker thread to track it 204 // notice: it's possible that log enqueued when this.running is set but worker thread 205 // still not launched, so it's necessary to check workerThreads before start the worker 206 tryStartNewShipper(logPrefix, queue); 207 } 208 } 209 queue.put(log); 210 if (LOG.isTraceEnabled()) { 211 LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix, 212 this.replicationQueueInfo.getQueueId()); 213 } 214 this.metrics.incrSizeOfLogQueue(); 215 // This will log a warning for each new log that gets created above the warn threshold 216 int queueSize = queue.size(); 217 if (queueSize > this.logQueueWarnThreshold) { 218 LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize 219 + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); 220 } 221 } 222 223 @Override 224 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 225 throws ReplicationException { 226 Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); 227 if (tableCFMap != null) { 228 List<String> tableCfs = tableCFMap.get(tableName); 229 if (tableCFMap.containsKey(tableName) 230 && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { 231 this.queueStorage.addHFileRefs(peerId, pairs); 232 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 233 } else { 234 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 235 tableName, Bytes.toString(family), peerId); 236 } 237 } else { 238 // user has explicitly not defined any table cfs for replication, means replicate all the 239 // data 240 this.queueStorage.addHFileRefs(peerId, pairs); 241 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 242 } 243 } 244 245 private ReplicationEndpoint createReplicationEndpoint() 246 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 247 RegionServerCoprocessorHost rsServerHost = null; 248 if (server instanceof HRegionServer) { 249 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 250 } 251 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 252 253 ReplicationEndpoint replicationEndpoint; 254 if (replicationEndpointImpl == null) { 255 // Default to HBase inter-cluster replication endpoint; skip reflection 256 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 257 } else { 258 try { 259 replicationEndpoint = Class.forName(replicationEndpointImpl) 260 .asSubclass(ReplicationEndpoint.class) 261 .getDeclaredConstructor() 262 .newInstance(); 263 } catch (NoSuchMethodException | InvocationTargetException e) { 264 throw new IllegalArgumentException(e); 265 } 266 } 267 if (rsServerHost != null) { 268 ReplicationEndpoint newReplicationEndPoint = 269 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 270 if (newReplicationEndPoint != null) { 271 // Override the newly created endpoint from the hook with configured end point 272 replicationEndpoint = newReplicationEndPoint; 273 } 274 } 275 return replicationEndpoint; 276 } 277 278 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 279 throws IOException, TimeoutException { 280 TableDescriptors tableDescriptors = null; 281 if (server instanceof HRegionServer) { 282 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 283 } 284 replicationEndpoint 285 .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, 286 clusterId, replicationPeer, metrics, tableDescriptors, server)); 287 replicationEndpoint.start(); 288 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 289 } 290 291 private void initializeWALEntryFilter(UUID peerClusterId) { 292 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 293 ArrayList<WALEntryFilter> filters = 294 Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter()); 295 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 296 if (filterFromEndpoint != null) { 297 filters.add(filterFromEndpoint); 298 } 299 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 300 this.walEntryFilter = new ChainWALEntryFilter(filters); 301 } 302 303 private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { 304 ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); 305 ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); 306 if (extant != null) { 307 if(LOG.isDebugEnabled()) { 308 LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(), 309 walGroupId); 310 } 311 } else { 312 if(LOG.isDebugEnabled()) { 313 LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId); 314 } 315 ReplicationSourceWALReader walReader = 316 createNewWALReader(walGroupId, queue, worker.getStartPosition()); 317 Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() + 318 ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException); 319 worker.setWALReader(walReader); 320 worker.startup(this::uncaughtException); 321 } 322 } 323 324 @Override 325 public Map<String, ReplicationStatus> getWalGroupStatus() { 326 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 327 long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; 328 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 329 String walGroupId = walGroupShipper.getKey(); 330 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 331 lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); 332 ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); 333 int queueSize = queues.get(walGroupId).size(); 334 replicationDelay = 335 ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); 336 Path currentPath = shipper.getCurrentPath(); 337 fileSize = -1; 338 if (currentPath != null) { 339 try { 340 fileSize = getFileSize(currentPath); 341 } catch (IOException e) { 342 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 343 } 344 } else { 345 currentPath = new Path("NO_LOGS_IN_QUEUE"); 346 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 347 } 348 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 349 statusBuilder.withPeerId(this.getPeerId()) 350 .withQueueSize(queueSize) 351 .withWalGroup(walGroupId) 352 .withCurrentPath(currentPath) 353 .withCurrentPosition(shipper.getCurrentPosition()) 354 .withFileSize(fileSize) 355 .withAgeOfLastShippedOp(ageOfLastShippedOp) 356 .withReplicationDelay(replicationDelay); 357 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 358 } 359 return sourceReplicationStatus; 360 } 361 362 private long getFileSize(Path currentPath) throws IOException { 363 long fileSize; 364 try { 365 fileSize = fs.getContentSummary(currentPath).getLength(); 366 } catch (FileNotFoundException e) { 367 currentPath = getArchivedLogPath(currentPath, conf); 368 fileSize = fs.getContentSummary(currentPath).getLength(); 369 } 370 return fileSize; 371 } 372 373 protected ReplicationSourceShipper createNewShipper(String walGroupId, 374 PriorityBlockingQueue<Path> queue) { 375 return new ReplicationSourceShipper(conf, walGroupId, queue, this); 376 } 377 378 private ReplicationSourceWALReader createNewWALReader(String walGroupId, 379 PriorityBlockingQueue<Path> queue, long startPosition) { 380 return replicationPeer.getPeerConfig().isSerial() 381 ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) 382 : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); 383 } 384 385 protected final void uncaughtException(Thread t, Throwable e) { 386 RSRpcServices.exitIfOOME(e); 387 LOG.error("Unexpected exception in {} currentPath={}", 388 t.getName(), getCurrentPath(), e); 389 server.abort("Unexpected exception in " + t.getName(), e); 390 } 391 392 @Override 393 public ReplicationEndpoint getReplicationEndpoint() { 394 return this.replicationEndpoint; 395 } 396 397 @Override 398 public ReplicationSourceManager getSourceManager() { 399 return this.manager; 400 } 401 402 @Override 403 public void tryThrottle(int batchSize) throws InterruptedException { 404 checkBandwidthChangeAndResetThrottler(); 405 if (throttler.isEnabled()) { 406 long sleepTicks = throttler.getNextSleepInterval(batchSize); 407 if (sleepTicks > 0) { 408 if (LOG.isTraceEnabled()) { 409 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 410 } 411 Thread.sleep(sleepTicks); 412 // reset throttler's cycle start tick when sleep for throttling occurs 413 throttler.resetStartTick(); 414 } 415 } 416 } 417 418 private void checkBandwidthChangeAndResetThrottler() { 419 long peerBandwidth = getCurrentBandwidth(); 420 if (peerBandwidth != currentBandwidth) { 421 currentBandwidth = peerBandwidth; 422 throttler.setBandwidth((double) currentBandwidth / 10.0); 423 LOG.info("ReplicationSource : " + peerId 424 + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); 425 } 426 } 427 428 private long getCurrentBandwidth() { 429 long peerBandwidth = replicationPeer.getPeerBandwidth(); 430 // user can set peer bandwidth to 0 to use default bandwidth 431 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 432 } 433 434 /** 435 * Do the sleeping logic 436 * @param msg Why we sleep 437 * @param sleepMultiplier by how many times the default sleeping time is augmented 438 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 439 */ 440 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 441 try { 442 if (LOG.isTraceEnabled()) { 443 LOG.trace("{} {}, sleeping {} times {}", 444 logPeerId(), msg, sleepForRetries, sleepMultiplier); 445 } 446 Thread.sleep(this.sleepForRetries * sleepMultiplier); 447 } catch (InterruptedException e) { 448 if(LOG.isDebugEnabled()) { 449 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 450 } 451 Thread.currentThread().interrupt(); 452 } 453 return sleepMultiplier < maxRetriesMultiplier; 454 } 455 456 /** 457 * check whether the peer is enabled or not 458 * @return true if the peer is enabled, otherwise false 459 */ 460 @Override 461 public boolean isPeerEnabled() { 462 return replicationPeer.isPeerEnabled(); 463 } 464 465 private void initialize() { 466 int sleepMultiplier = 1; 467 while (this.isSourceActive()) { 468 ReplicationEndpoint replicationEndpoint; 469 try { 470 replicationEndpoint = createReplicationEndpoint(); 471 } catch (Exception e) { 472 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 473 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 474 sleepMultiplier++; 475 } 476 continue; 477 } 478 479 try { 480 initAndStartReplicationEndpoint(replicationEndpoint); 481 this.replicationEndpoint = replicationEndpoint; 482 break; 483 } catch (Exception e) { 484 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 485 replicationEndpoint.stop(); 486 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 487 sleepMultiplier++; 488 } 489 } 490 } 491 492 if (!this.isSourceActive()) { 493 return; 494 } 495 496 sleepMultiplier = 1; 497 UUID peerClusterId; 498 // delay this until we are in an asynchronous thread 499 for (;;) { 500 peerClusterId = replicationEndpoint.getPeerUUID(); 501 if (this.isSourceActive() && peerClusterId == null) { 502 if(LOG.isDebugEnabled()) { 503 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 504 (this.sleepForRetries * sleepMultiplier)); 505 } 506 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 507 sleepMultiplier++; 508 } 509 } else { 510 break; 511 } 512 } 513 514 if (!this.isSourceActive()) { 515 return; 516 } 517 518 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 519 // peerClusterId value, which is the same as the source clusterId 520 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) { 521 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " 522 + peerClusterId + " which is not allowed by ReplicationEndpoint:" 523 + replicationEndpoint.getClass().getName(), null, false); 524 this.manager.removeSource(this); 525 return; 526 } 527 LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", 528 logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); 529 530 initializeWALEntryFilter(peerClusterId); 531 // start workers 532 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { 533 String walGroupId = entry.getKey(); 534 PriorityBlockingQueue<Path> queue = entry.getValue(); 535 tryStartNewShipper(walGroupId, queue); 536 } 537 } 538 539 @Override 540 public void startup() { 541 // mark we are running now 542 this.sourceRunning = true; 543 initThread = new Thread(this::initialize); 544 Threads.setDaemonThreadRunning(initThread, 545 Thread.currentThread().getName() + ".replicationSource," + this.queueId, 546 this::uncaughtException); 547 } 548 549 @Override 550 public void terminate(String reason) { 551 terminate(reason, null); 552 } 553 554 @Override 555 public void terminate(String reason, Exception cause) { 556 terminate(reason, cause, true); 557 } 558 559 @Override 560 public void terminate(String reason, Exception cause, boolean clearMetrics) { 561 terminate(reason, cause, clearMetrics, true); 562 } 563 564 public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 565 if (cause == null) { 566 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 567 } else { 568 LOG.error("{} Closing source {} because an error occurred: {}", 569 logPeerId(), this.queueId, reason, cause); 570 } 571 this.sourceRunning = false; 572 if (initThread != null && Thread.currentThread() != initThread) { 573 // This usually won't happen but anyway, let's wait until the initialization thread exits. 574 // And notice that we may call terminate directly from the initThread so here we need to 575 // avoid join on ourselves. 576 initThread.interrupt(); 577 Threads.shutdown(initThread, this.sleepForRetries); 578 } 579 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 580 for (ReplicationSourceShipper worker : workers) { 581 worker.stopWorker(); 582 worker.entryReader.setReaderRunning(false); 583 } 584 585 for (ReplicationSourceShipper worker : workers) { 586 if (worker.isAlive() || worker.entryReader.isAlive()) { 587 try { 588 // Wait worker to stop 589 Thread.sleep(this.sleepForRetries); 590 } catch (InterruptedException e) { 591 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 592 Thread.currentThread().interrupt(); 593 } 594 // If worker still is alive after waiting, interrupt it 595 if (worker.isAlive()) { 596 worker.interrupt(); 597 } 598 // If entry reader is alive after waiting, interrupt it 599 if (worker.entryReader.isAlive()) { 600 worker.entryReader.interrupt(); 601 } 602 } 603 } 604 605 if (this.replicationEndpoint != null) { 606 this.replicationEndpoint.stop(); 607 } 608 if (join) { 609 for (ReplicationSourceShipper worker : workers) { 610 Threads.shutdown(worker, this.sleepForRetries); 611 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 612 } 613 if (this.replicationEndpoint != null) { 614 try { 615 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 616 TimeUnit.MILLISECONDS); 617 } catch (TimeoutException te) { 618 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 619 + "for replication source : {}", logPeerId(), this.queueId, te); 620 } 621 } 622 } 623 if (clearMetrics) { 624 this.metrics.clear(); 625 } 626 } 627 628 @Override 629 public String getQueueId() { 630 return this.queueId; 631 } 632 633 @Override 634 public String getPeerId() { 635 return this.peerId; 636 } 637 638 @Override 639 public Path getCurrentPath() { 640 // only for testing 641 for (ReplicationSourceShipper worker : workerThreads.values()) { 642 if (worker.getCurrentPath() != null) { 643 return worker.getCurrentPath(); 644 } 645 } 646 return null; 647 } 648 649 @Override 650 public boolean isSourceActive() { 651 return !this.server.isStopped() && this.sourceRunning; 652 } 653 654 /** 655 * Comparator used to compare logs together based on their start time 656 */ 657 public static class LogsComparator implements Comparator<Path> { 658 659 @Override 660 public int compare(Path o1, Path o2) { 661 return Long.compare(getTS(o1), getTS(o2)); 662 } 663 664 /** 665 * Split a path to get the start time 666 * For example: 10.20.20.171%3A60020.1277499063250 667 * @param p path to split 668 * @return start time 669 */ 670 private static long getTS(Path p) { 671 int tsIndex = p.getName().lastIndexOf('.') + 1; 672 return Long.parseLong(p.getName().substring(tsIndex)); 673 } 674 } 675 676 @Override 677 public String getStats() { 678 StringBuilder sb = new StringBuilder(); 679 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 680 .append(", current progress: \n"); 681 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 682 String walGroupId = entry.getKey(); 683 ReplicationSourceShipper worker = entry.getValue(); 684 long position = worker.getCurrentPosition(); 685 Path currentPath = worker.getCurrentPath(); 686 sb.append("walGroup [").append(walGroupId).append("]: "); 687 if (currentPath != null) { 688 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 689 .append(position).append("\n"); 690 } else { 691 sb.append("no replication ongoing, waiting for new log"); 692 } 693 } 694 return sb.toString(); 695 } 696 697 @Override 698 public MetricsSource getSourceMetrics() { 699 return this.metrics; 700 } 701 702 @Override 703 //offsets totalBufferUsed by deducting shipped batchSize. 704 public void postShipEdits(List<Entry> entries, int batchSize) { 705 if (throttler.isEnabled()) { 706 throttler.addPushSize(batchSize); 707 } 708 totalReplicatedEdits.addAndGet(entries.size()); 709 totalBufferUsed.addAndGet(-batchSize); 710 } 711 712 @Override 713 public WALFileLengthProvider getWALFileLengthProvider() { 714 return walFileLengthProvider; 715 } 716 717 @Override 718 public ServerName getServerWALsBelongTo() { 719 return server.getServerName(); 720 } 721 722 Server getServer() { 723 return server; 724 } 725 726 ReplicationQueueStorage getQueueStorage() { 727 return queueStorage; 728 } 729 730 private String logPeerId(){ 731 return "[Source for peer " + this.getPeerId() + "]:"; 732 } 733}