001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URLEncoder; 024import java.nio.charset.StandardCharsets; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.NavigableSet; 033import java.util.OptionalLong; 034import java.util.PriorityQueue; 035import java.util.Set; 036import java.util.SortedSet; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.ThreadLocalRandom; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.CompoundConfiguration; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.Server; 053import org.apache.hadoop.hbase.ServerName; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.replication.ReplicationException; 057import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 058import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; 059import org.apache.hadoop.hbase.replication.ReplicationPeer; 060import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 061import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 062import org.apache.hadoop.hbase.replication.ReplicationPeers; 063import org.apache.hadoop.hbase.replication.ReplicationQueueData; 064import org.apache.hadoop.hbase.replication.ReplicationQueueId; 065import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 066import org.apache.hadoop.hbase.replication.ReplicationUtils; 067import org.apache.hadoop.hbase.replication.SyncReplicationState; 068import org.apache.hadoop.hbase.util.Pair; 069import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 070import org.apache.hadoop.hbase.wal.AbstractWALProvider; 071import org.apache.hadoop.hbase.wal.WAL.Entry; 072import org.apache.hadoop.hbase.wal.WALFactory; 073import org.apache.yetus.audience.InterfaceAudience; 074import org.apache.zookeeper.KeeperException; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 079import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 080import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 081 082/** 083 * This class is responsible to manage all the replication sources. There are two classes of 084 * sources: 085 * <ul> 086 * <li>Normal sources are persistent and one per peer cluster</li> 087 * <li>Old sources are recovered from a failed region server and our only goal is to finish 088 * replicating the WAL queue it had</li> 089 * </ul> 090 * <p> 091 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 092 * in order to transfer all the queues in a local old source. 093 * <p> 094 * Synchronization specification: 095 * <ul> 096 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 097 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 098 * operations.</li> 099 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 100 * {@link #addPeer(String)}, {@link #removePeer(String)}, 101 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 102 * {@link #postLogRoll(Path)}. {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer 103 * id in {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 104 * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} 105 * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 106 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 107 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 108 * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 109 * {@link #postLogRoll(Path)}.</li> 110 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 111 * modify it, {@link #removePeer(String)} , 112 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 113 * {@link #claimQueue(ReplicationQueueId)}. 114 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by 115 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 116 * {@link ReplicationSourceInterface} firstly, then remove the wals from 117 * {@link #walsByIdRecoveredQueues}. And {@link #claimQueue(ReplicationQueueId)} will add the wals 118 * to {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. 119 * So there is no race here. For {@link #claimQueue(ReplicationQueueId)} and 120 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need 121 * synchronized on {@link #walsByIdRecoveredQueues}.</li> 122 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 123 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 124 * to-be-removed peer.</li> 125 * </ul> 126 */ 127@InterfaceAudience.Private 128public class ReplicationSourceManager { 129 130 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 131 // all the sources that read this RS's logs and every peer only has one replication source 132 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 133 // List of all the sources we got from died RSs 134 private final List<ReplicationSourceInterface> oldsources; 135 136 /** 137 * Storage for queues that need persistance; e.g. Replication state so can be recovered after a 138 * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances 139 * for these to do updates themselves. Not all ReplicationSource instances keep state. 140 */ 141 private final ReplicationQueueStorage queueStorage; 142 143 private final ReplicationPeers replicationPeers; 144 // UUID for this cluster 145 private final UUID clusterId; 146 // All about stopping 147 private final Server server; 148 149 // All logs we are currently tracking 150 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 151 private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById; 152 // Logs for recovered sources we are currently tracking 153 // the map is: queue_id->logPrefix/logGroup->logs 154 // for recovered source, the WAL files should already been moved to oldLogDir, and we have 155 // different layout of old WAL files, for example, with server name sub directories or not, so 156 // here we record the full path instead of just the name, so when refreshing we can enqueue the 157 // WAL file again, without trying to guess the real path of the WAL files. 158 private final ConcurrentMap<ReplicationQueueId, 159 Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues; 160 161 private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; 162 163 private final Configuration conf; 164 private final FileSystem fs; 165 // The paths to the latest log of each wal group, for new coming peers 166 private final Map<String, Path> latestPaths; 167 // Path to the wals directories 168 private final Path logDir; 169 // Path to the wal archive 170 private final Path oldLogDir; 171 private final WALFactory walFactory; 172 // The number of ms that we wait before moving znodes, HBASE-3596 173 private final long sleepBeforeFailover; 174 // Homemade executer service for replication 175 private final ThreadPoolExecutor executor; 176 177 private AtomicLong totalBufferUsed = new AtomicLong(); 178 179 // How long should we sleep for each retry when deleting remote wal files for sync replication 180 // peer. 181 private final long sleepForRetries; 182 // Maximum number of retries before taking bold actions when deleting remote wal files for sync 183 // replication peer. 184 private final int maxRetriesMultiplier; 185 // Total buffer size on this RegionServer for holding batched edits to be shipped. 186 private final long totalBufferLimit; 187 private final MetricsReplicationGlobalSourceSource globalMetrics; 188 189 /** 190 * Creates a replication manager and sets the watch on all the other registered region servers 191 * @param queueStorage the interface for manipulating replication queues 192 * @param conf the configuration to use 193 * @param server the server for this region server 194 * @param fs the file system to use 195 * @param logDir the directory that contains all wal directories of live RSs 196 * @param oldLogDir the directory where old logs are archived 197 */ 198 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 199 ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs, 200 Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory, 201 SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, 202 MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { 203 this.sources = new ConcurrentHashMap<>(); 204 this.queueStorage = queueStorage; 205 this.replicationPeers = replicationPeers; 206 this.server = server; 207 this.walsById = new ConcurrentHashMap<>(); 208 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 209 this.oldsources = new ArrayList<>(); 210 this.conf = conf; 211 this.fs = fs; 212 this.logDir = logDir; 213 this.oldLogDir = oldLogDir; 214 // 30 seconds 215 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); 216 this.clusterId = clusterId; 217 this.walFactory = walFactory; 218 this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; 219 // It's preferable to failover 1 RS at a time, but with good zk servers 220 // more could be processed at the same time. 221 int nbWorkers = conf.getInt("replication.executor.workers", 1); 222 // use a short 100ms sleep since this could be done inline with a RS startup 223 // even if we fail, other region servers can take care of it 224 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, 225 new LinkedBlockingQueue<>()); 226 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 227 tfb.setNameFormat("ReplicationExecutor-%d"); 228 tfb.setDaemon(true); 229 this.executor.setThreadFactory(tfb.build()); 230 this.latestPaths = new HashMap<>(); 231 this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); 232 this.maxRetriesMultiplier = 233 this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); 234 this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 235 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 236 this.globalMetrics = globalMetrics; 237 } 238 239 /** 240 * Adds a normal source per registered peer cluster. 241 */ 242 void init() throws IOException { 243 for (String id : this.replicationPeers.getAllPeerIds()) { 244 addSource(id, true); 245 } 246 } 247 248 /** 249 * <ol> 250 * <li>Add peer to replicationPeers</li> 251 * <li>Add the normal source and related replication queue</li> 252 * <li>Add HFile Refs</li> 253 * </ol> 254 * @param peerId the id of replication peer 255 */ 256 public void addPeer(String peerId) throws IOException { 257 boolean added = false; 258 try { 259 added = this.replicationPeers.addPeer(peerId); 260 } catch (ReplicationException e) { 261 throw new IOException(e); 262 } 263 if (added) { 264 addSource(peerId, false); 265 } 266 } 267 268 /** 269 * <ol> 270 * <li>Remove peer for replicationPeers</li> 271 * <li>Remove all the recovered sources for the specified id and related replication queues</li> 272 * <li>Remove the normal source and related replication queue</li> 273 * <li>Remove HFile Refs</li> 274 * </ol> 275 * @param peerId the id of the replication peer 276 */ 277 public void removePeer(String peerId) { 278 ReplicationPeer peer = replicationPeers.removePeer(peerId); 279 String terminateMessage = "Replication stream was removed by a user"; 280 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 281 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 282 // see NodeFailoverWorker.run 283 synchronized (this.oldsources) { 284 // First close all the recovered sources for this peer 285 for (ReplicationSourceInterface src : oldsources) { 286 if (peerId.equals(src.getPeerId())) { 287 oldSourcesToDelete.add(src); 288 } 289 } 290 for (ReplicationSourceInterface src : oldSourcesToDelete) { 291 src.terminate(terminateMessage); 292 removeRecoveredSource(src); 293 } 294 } 295 LOG.info("Number of deleted recovered sources for {}: {}", peerId, oldSourcesToDelete.size()); 296 // Now close the normal source for this peer 297 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 298 if (srcToRemove != null) { 299 srcToRemove.terminate(terminateMessage); 300 removeSource(srcToRemove); 301 } 302 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 303 if (peerConfig.isSyncReplication()) { 304 syncReplicationPeerMappingManager.remove(peerId, peerConfig); 305 } 306 } 307 308 /** 309 * @return a new 'classic' user-space replication source. 310 * @param queueId the id of the replication queue to associate the ReplicationSource with. 311 * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta. 312 */ 313 private ReplicationSourceInterface createSource(ReplicationQueueData queueData, 314 ReplicationPeer replicationPeer) throws IOException { 315 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueData.getId()); 316 // Init the just created replication source. Pass the default walProvider's wal file length 317 // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica 318 // replication, see #createCatalogReplicationSource(). 319 WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null 320 ? this.walFactory.getWALProvider().getWALFileLengthProvider() 321 : p -> OptionalLong.empty(); 322 323 // Create merged configuration with peer overrides as higher priority and 324 // global config as lower priority 325 Configuration mergedConf = conf; 326 if (!replicationPeer.getPeerConfig().getConfiguration().isEmpty()) { 327 CompoundConfiguration compound = new CompoundConfiguration(); 328 compound.add(conf); 329 compound.addStringMap(replicationPeer.getPeerConfig().getConfiguration()); 330 mergedConf = compound; 331 } 332 333 src.init(mergedConf, fs, this, queueStorage, replicationPeer, server, queueData, clusterId, 334 walFileLengthProvider, new MetricsSource(queueData.getId().toString())); 335 return src; 336 } 337 338 /** 339 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 340 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 341 * group and do replication. 342 * <p/> 343 * We add a {@code init} parameter to indicate whether this is part of the initialization process. 344 * If so, we should skip adding the replication queues as this may introduce dead lock on region 345 * server start up and hbase:replication table online. 346 * @param peerId the id of the replication peer 347 * @param init whether this call is part of the initialization process 348 * @return the source that was created 349 */ 350 void addSource(String peerId, boolean init) throws IOException { 351 ReplicationPeer peer = replicationPeers.getPeer(peerId); 352 if ( 353 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 354 .equals(peer.getPeerConfig().getReplicationEndpointImpl()) 355 ) { 356 // we do not use this endpoint for region replication any more, see HBASE-26233 357 LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig()); 358 return; 359 } 360 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 361 ReplicationSourceInterface src = 362 createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); 363 // synchronized on latestPaths to avoid missing the new log 364 synchronized (this.latestPaths) { 365 this.sources.put(peerId, src); 366 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 367 this.walsById.put(queueId, walsByGroup); 368 // Add the latest wal to that source's queue 369 if (!latestPaths.isEmpty()) { 370 for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) { 371 Path walPath = walPrefixAndPath.getValue(); 372 NavigableSet<String> wals = new TreeSet<>(); 373 wals.add(walPath.getName()); 374 walsByGroup.put(walPrefixAndPath.getKey(), wals); 375 if (!init) { 376 // Abort RS and throw exception to make add peer failed 377 // Ideally we'd better use the current file size as offset so we can skip replicating 378 // the data before adding replication peer, but the problem is that the file may not end 379 // at a valid entry's ending, and the current WAL Reader implementation can not deal 380 // with reading from the middle of a WAL entry. Can improve later. 381 abortAndThrowIOExceptionWhenFail( 382 () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), 383 new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); 384 } 385 src.enqueueLog(walPath); 386 LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); 387 } 388 } 389 } 390 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 391 if (peerConfig.isSyncReplication()) { 392 syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); 393 } 394 src.startup(); 395 } 396 397 /** 398 * <p> 399 * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}. 400 * </p> 401 * <p> 402 * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal 403 * files for a replication peer as we do not need to replicate them any more. And this is 404 * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE} 405 * later, the stale data will be replicated again and cause inconsistency. 406 * </p> 407 * <p> 408 * See HBASE-20426 for more details. 409 * </p> 410 * @param peerId the id of the sync replication peer 411 */ 412 public void drainSources(String peerId) throws IOException, ReplicationException { 413 String terminateMessage = "Sync replication peer " + peerId 414 + " is transiting to STANDBY. Will close the previous replication source and open a new one"; 415 ReplicationPeer peer = replicationPeers.getPeer(peerId); 416 assert peer.getPeerConfig().isSyncReplication(); 417 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 418 // TODO: use empty initial offsets for now, revisit when adding support for sync replication 419 ReplicationSourceInterface src = 420 createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); 421 // synchronized here to avoid race with postLogRoll where we add new log to source and also 422 // walsById. 423 ReplicationSourceInterface toRemove; 424 ReplicationQueueData queueData; 425 synchronized (latestPaths) { 426 // Here we make a copy of all the remaining wal files and then delete them from the 427 // replication queue storage after releasing the lock. It is not safe to just remove the old 428 // map from walsById since later we may fail to update the replication queue storage, and when 429 // we retry next time, we can not know the wal files that needs to be set to the replication 430 // queue storage 431 ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder(); 432 synchronized (walsById) { 433 walsById.get(queueId).forEach((group, wals) -> { 434 if (!wals.isEmpty()) { 435 builder.put(group, new ReplicationGroupOffset(wals.last(), -1)); 436 } 437 }); 438 } 439 queueData = new ReplicationQueueData(queueId, builder.build()); 440 src = createSource(queueData, peer); 441 toRemove = sources.put(peerId, src); 442 if (toRemove != null) { 443 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 444 toRemove.terminate(terminateMessage); 445 toRemove.getSourceMetrics().clear(); 446 } 447 } 448 for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) { 449 queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap()); 450 } 451 LOG.info("Startup replication source for " + src.getPeerId()); 452 src.startup(); 453 synchronized (walsById) { 454 Map<String, NavigableSet<String>> wals = walsById.get(queueId); 455 queueData.getOffsets().forEach((group, offset) -> { 456 NavigableSet<String> walsByGroup = wals.get(group); 457 if (walsByGroup != null) { 458 walsByGroup.headSet(offset.getWal(), true).clear(); 459 } 460 }); 461 } 462 // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is 463 // a background task, we will delete the file from replication queue storage under the lock to 464 // simplify the logic. 465 synchronized (this.oldsources) { 466 for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) { 467 ReplicationSourceInterface oldSource = iter.next(); 468 if (oldSource.getPeerId().equals(peerId)) { 469 ReplicationQueueId oldSourceQueueId = oldSource.getQueueId(); 470 oldSource.terminate(terminateMessage); 471 oldSource.getSourceMetrics().clear(); 472 queueStorage.removeQueue(oldSourceQueueId); 473 walsByIdRecoveredQueues.remove(oldSourceQueueId); 474 iter.remove(); 475 } 476 } 477 } 478 } 479 480 private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, 481 ReplicationPeer peer) throws IOException, ReplicationException { 482 Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); 483 return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); 484 } 485 486 /** 487 * Close the previous replication sources of this peer id and open new sources to trigger the new 488 * replication state changes or new replication config changes. Here we don't need to change 489 * replication queue storage and only to enqueue all logs to the new replication source 490 * @param peerId the id of the replication peer 491 */ 492 public void refreshSources(String peerId) throws ReplicationException, IOException { 493 String terminateMessage = "Peer " + peerId 494 + " state or config changed. Will close the previous replication source and open a new one"; 495 ReplicationPeer peer = replicationPeers.getPeer(peerId); 496 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 497 ReplicationSourceInterface src; 498 // synchronized on latestPaths to avoid missing the new log 499 synchronized (this.latestPaths) { 500 ReplicationSourceInterface toRemove = this.sources.remove(peerId); 501 if (toRemove != null) { 502 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 503 // Do not clear metrics 504 toRemove.terminate(terminateMessage, null, false); 505 } 506 src = createRefreshedSource(queueId, peer); 507 this.sources.put(peerId, src); 508 for (NavigableSet<String> walsByGroup : walsById.get(queueId).values()) { 509 walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); 510 } 511 } 512 LOG.info("Startup replication source for " + src.getPeerId()); 513 src.startup(); 514 515 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 516 // synchronized on oldsources to avoid race with NodeFailoverWorker 517 synchronized (this.oldsources) { 518 List<ReplicationQueueId> oldSourceQueueIds = new ArrayList<>(); 519 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 520 .hasNext();) { 521 ReplicationSourceInterface oldSource = iter.next(); 522 if (oldSource.getPeerId().equals(peerId)) { 523 oldSourceQueueIds.add(oldSource.getQueueId()); 524 oldSource.terminate(terminateMessage); 525 iter.remove(); 526 } 527 } 528 for (ReplicationQueueId oldSourceQueueId : oldSourceQueueIds) { 529 ReplicationSourceInterface recoveredReplicationSource = 530 createRefreshedSource(oldSourceQueueId, peer); 531 this.oldsources.add(recoveredReplicationSource); 532 for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId) 533 .values()) { 534 walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal)); 535 } 536 toStartup.add(recoveredReplicationSource); 537 } 538 } 539 for (ReplicationSourceInterface replicationSource : toStartup) { 540 replicationSource.startup(); 541 } 542 } 543 544 /** 545 * Clear the metrics and related replication queue of the specified old source 546 * @param src source to clear 547 */ 548 private boolean removeRecoveredSource(ReplicationSourceInterface src) { 549 if (!this.oldsources.remove(src)) { 550 return false; 551 } 552 LOG.info("Done with the recovered queue {}", src.getQueueId()); 553 // Delete queue from storage and memory 554 deleteQueue(src.getQueueId()); 555 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 556 return true; 557 } 558 559 void finishRecoveredSource(ReplicationSourceInterface src) { 560 synchronized (oldsources) { 561 if (!removeRecoveredSource(src)) { 562 return; 563 } 564 } 565 LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), 566 src.getStats()); 567 } 568 569 /** 570 * Clear the metrics and related replication queue of the specified old source 571 * @param src source to clear 572 */ 573 void removeSource(ReplicationSourceInterface src) { 574 LOG.info("Done with the queue " + src.getQueueId()); 575 this.sources.remove(src.getPeerId()); 576 // Delete queue from storage and memory 577 deleteQueue(src.getQueueId()); 578 this.walsById.remove(src.getQueueId()); 579 580 } 581 582 /** 583 * Delete a complete queue of wals associated with a replication source 584 * @param queueId the id of replication queue to delete 585 */ 586 private void deleteQueue(ReplicationQueueId queueId) { 587 abortWhenFail(() -> this.queueStorage.removeQueue(queueId)); 588 } 589 590 @FunctionalInterface 591 private interface ReplicationQueueOperation { 592 void exec() throws ReplicationException; 593 } 594 595 /** 596 * Refresh replication source will terminate the old source first, then the source thread will be 597 * interrupted. Need to handle it instead of abort the region server. 598 */ 599 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 600 try { 601 op.exec(); 602 } catch (ReplicationException e) { 603 if ( 604 e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 605 && e.getCause().getCause() != null 606 && e.getCause().getCause() instanceof InterruptedException 607 ) { 608 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 609 // that thread is interrupted deep down in the stack, it should pass the following 610 // processing logic and propagate to the most top layer which can handle this exception 611 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 612 throw new ReplicationRuntimeException( 613 "Thread is interrupted, the replication source may be terminated", 614 e.getCause().getCause()); 615 } 616 server.abort("Failed to operate on replication queue", e); 617 } 618 } 619 620 private void abortWhenFail(ReplicationQueueOperation op) { 621 try { 622 op.exec(); 623 } catch (ReplicationException e) { 624 server.abort("Failed to operate on replication queue", e); 625 } 626 } 627 628 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 629 try { 630 op.exec(); 631 } catch (ReplicationException e) { 632 throw new IOException(e); 633 } 634 } 635 636 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 637 try { 638 op.exec(); 639 } catch (ReplicationException e) { 640 server.abort("Failed to operate on replication queue", e); 641 throw new IOException(e); 642 } 643 } 644 645 /** 646 * This method will log the current position to storage. And also clean old logs from the 647 * replication queue. 648 * @param source the replication source 649 * @param entryBatch the wal entry batch we just shipped 650 */ 651 public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, 652 WALEntryBatch entryBatch) { 653 String walName = entryBatch.getLastWalPath().getName(); 654 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName); 655 // if end of file, we just set the offset to -1 so we know that this file has already been fully 656 // replicated, otherwise we need to compare the file length 657 ReplicationGroupOffset offset = new ReplicationGroupOffset(walName, 658 entryBatch.isEndOfFile() ? -1 : entryBatch.getLastWalPosition()); 659 interruptOrAbortWhenFail(() -> this.queueStorage.setOffset(source.getQueueId(), walPrefix, 660 offset, entryBatch.getLastSeqIds())); 661 cleanOldLogs(walName, entryBatch.isEndOfFile(), source); 662 } 663 664 /** 665 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 666 * file is closed and has no more entries. 667 * @param log Path to the log 668 * @param inclusive whether we should also remove the given log file 669 * @param source the replication source 670 */ 671 void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { 672 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 673 if (source.isRecovered()) { 674 NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); 675 if (wals != null) { 676 // here we just want to compare the timestamp, so it is OK to just create a fake WAL path 677 NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive) 678 .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new)); 679 if (walsToRemove.isEmpty()) { 680 return; 681 } 682 cleanOldLogs(walsToRemove, source); 683 walsToRemove.clear(); 684 } 685 } else { 686 NavigableSet<String> wals; 687 NavigableSet<String> walsToRemove; 688 // synchronized on walsById to avoid race with postLogRoll 689 synchronized (this.walsById) { 690 wals = walsById.get(source.getQueueId()).get(logPrefix); 691 if (wals == null) { 692 return; 693 } 694 walsToRemove = wals.headSet(log, inclusive); 695 if (walsToRemove.isEmpty()) { 696 return; 697 } 698 walsToRemove = new TreeSet<>(walsToRemove); 699 } 700 // cleanOldLogs may spend some time, especially for sync replication where we may want to 701 // remove remote wals as the remote cluster may have already been down, so we do it outside 702 // the lock to avoid block preLogRoll 703 cleanOldLogs(walsToRemove, source); 704 // now let's remove the files in the set 705 synchronized (this.walsById) { 706 wals.removeAll(walsToRemove); 707 } 708 } 709 } 710 711 private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) 712 throws IOException { 713 Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); 714 FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); 715 for (String wal : wals) { 716 Path walFile = new Path(remoteWALDirForPeer, wal); 717 try { 718 if (!fs.delete(walFile, false) && fs.exists(walFile)) { 719 throw new IOException("Can not delete " + walFile); 720 } 721 } catch (FileNotFoundException e) { 722 // Just ignore since this means the file has already been deleted. 723 // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an 724 // inexistent file, so here we deal with both, i.e, check the return value of the 725 // FileSystem.delete, and also catch FNFE. 726 LOG.debug("The remote wal {} has already been deleted?", walFile, e); 727 } 728 } 729 } 730 731 private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) { 732 LOG.debug("Removing {} logs in the list: {}", wals.size(), wals); 733 // The intention here is that, we want to delete the remote wal files ASAP as it may effect the 734 // failover time if you want to transit the remote cluster from S to A. And the infinite retry 735 // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can 736 // not contact with the HBase cluster either, so the replication will be blocked either. 737 if (source.isSyncReplication()) { 738 String peerId = source.getPeerId(); 739 String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); 740 // Filter out the wals need to be removed from the remote directory. Its name should be the 741 // special format, and also, the peer id in its name should match the peer id for the 742 // replication source. 743 List<String> remoteWals = 744 wals.stream().filter(w -> AbstractWALProvider.getSyncReplicationPeerIdFromWALName(w) 745 .map(peerId::equals).orElse(false)).collect(Collectors.toList()); 746 LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(), 747 remoteWALDir, remoteWals); 748 if (!remoteWals.isEmpty()) { 749 for (int sleepMultiplier = 0;;) { 750 try { 751 removeRemoteWALs(peerId, remoteWALDir, remoteWals); 752 break; 753 } catch (IOException e) { 754 LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, 755 peerId); 756 } 757 if (!source.isSourceActive()) { 758 // skip the following operations 759 return; 760 } 761 if ( 762 ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, 763 sleepMultiplier, maxRetriesMultiplier) 764 ) { 765 sleepMultiplier++; 766 } 767 } 768 } 769 } 770 } 771 772 // public because of we call it in TestReplicationEmptyWALRecovery 773 public void postLogRoll(Path newLog) throws IOException { 774 String logName = newLog.getName(); 775 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 776 // synchronized on latestPaths to avoid the new open source miss the new log 777 synchronized (this.latestPaths) { 778 // synchronized on walsById to avoid race with cleanOldLogs 779 synchronized (this.walsById) { 780 // Update walsById map 781 for (Map.Entry<ReplicationQueueId, Map<String, NavigableSet<String>>> entry : this.walsById 782 .entrySet()) { 783 ReplicationQueueId queueId = entry.getKey(); 784 String peerId = queueId.getPeerId(); 785 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 786 boolean existingPrefix = false; 787 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 788 SortedSet<String> wals = walsEntry.getValue(); 789 if (this.sources.isEmpty()) { 790 // If there's no slaves, don't need to keep the old wals since 791 // we only consider the last one when a new slave comes in 792 wals.clear(); 793 } 794 if (logPrefix.equals(walsEntry.getKey())) { 795 wals.add(logName); 796 existingPrefix = true; 797 } 798 } 799 if (!existingPrefix) { 800 // The new log belongs to a new group, add it into this peer 801 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 802 NavigableSet<String> wals = new TreeSet<>(); 803 wals.add(logName); 804 walsByPrefix.put(logPrefix, wals); 805 } 806 } 807 } 808 809 // Add to latestPaths 810 latestPaths.put(logPrefix, newLog); 811 } 812 // This only updates the sources we own, not the recovered ones 813 for (ReplicationSourceInterface source : this.sources.values()) { 814 source.enqueueLog(newLog); 815 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog, 816 source.getQueueId()); 817 } 818 } 819 820 /** 821 * Check whether we should replicate the given {@code wal}. 822 * @param wal the file name of the wal 823 * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}. 824 */ 825 private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) { 826 // skip replicating meta wals 827 if (AbstractFSWALProvider.isMetaFile(wal)) { 828 return false; 829 } 830 return ReplicationOffsetUtil.shouldReplicate(offset, wal); 831 } 832 833 void claimQueue(ReplicationQueueId queueId) { 834 claimQueue(queueId, false); 835 } 836 837 // sorted from oldest to newest 838 private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, 839 Map<String, ReplicationGroupOffset> offsets) throws IOException { 840 List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS, 841 URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name())); 842 if (syncUp) { 843 // we also need to list WALs directory for ReplicationSyncUp 844 walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS)); 845 } 846 PriorityQueue<Path> walFilesPQ = 847 new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); 848 // sort the wal files and also filter out replicated files 849 for (Path file : walFiles) { 850 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName()); 851 ReplicationGroupOffset groupOffset = offsets.get(walGroupId); 852 if (shouldReplicate(groupOffset, file.getName())) { 853 walFilesPQ.add(file); 854 } else { 855 LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), 856 groupOffset); 857 } 858 } 859 return walFilesPQ; 860 } 861 862 private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, 863 ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) { 864 ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId()); 865 if (peer == null || peer != oldPeer) { 866 src.terminate("Recovered queue doesn't belong to any current peer"); 867 deleteQueue(claimedQueueId); 868 return; 869 } 870 // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is 871 // transiting to STANDBY state. The only exception is we are in STANDBY state and 872 // transiting to DA, under this state we will replay the remote WAL and they need to be 873 // replicated back. 874 if (peer.getPeerConfig().isSyncReplication()) { 875 Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = 876 peer.getSyncReplicationStateAndNewState(); 877 if ( 878 (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) 879 && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) 880 || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY) 881 ) { 882 src.terminate("Sync replication peer is in STANDBY state"); 883 deleteQueue(claimedQueueId); 884 return; 885 } 886 } 887 // track sources in walsByIdRecoveredQueues 888 Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>(); 889 walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup); 890 for (Path wal : walFiles) { 891 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 892 NavigableSet<Path> wals = walsByGroup.get(walPrefix); 893 if (wals == null) { 894 wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); 895 walsByGroup.put(walPrefix, wals); 896 } 897 wals.add(wal); 898 } 899 oldsources.add(src); 900 LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId, 901 walFiles.size()); 902 for (Path wal : walFiles) { 903 LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId); 904 src.enqueueLog(wal); 905 } 906 src.startup(); 907 } 908 909 /** 910 * Claim a replication queue. 911 * <p/> 912 * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue 913 * operation, we are the last step of a SCP, so we can assume that all the WAL files are under 914 * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a 915 * region server which has not been processed by SCP yet, so we still need to look at its WALs 916 * directory. 917 * @param queueId the replication queue id we want to claim 918 * @param syncUp whether we are called by ReplicationSyncUp 919 */ 920 void claimQueue(ReplicationQueueId queueId, boolean syncUp) { 921 // Wait a bit before transferring the queues, we may be shutting down. 922 // This sleep may not be enough in some cases. 923 try { 924 Thread.sleep(sleepBeforeFailover 925 + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 926 } catch (InterruptedException e) { 927 LOG.warn("Interrupted while waiting before transferring a queue."); 928 Thread.currentThread().interrupt(); 929 } 930 // We try to lock that rs' queue directory 931 if (server.isStopped()) { 932 LOG.info("Not transferring queue since we are shutting down"); 933 return; 934 } 935 // After claim the queues from dead region server, we will skip to start the 936 // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a 937 // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get 938 // a copy of the replication peer first to decide whether we should start the 939 // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to 940 // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). 941 String peerId = queueId.getPeerId(); 942 ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); 943 if (oldPeer == null) { 944 LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", 945 peerId, queueId); 946 return; 947 } 948 Map<String, ReplicationGroupOffset> offsets; 949 try { 950 offsets = queueStorage.claimQueue(queueId, server.getServerName()); 951 } catch (ReplicationException e) { 952 LOG.error("ReplicationException: cannot claim dead region ({})'s replication queue", 953 queueId.getServerName(), e); 954 server.abort("Failed to claim queue from dead regionserver.", e); 955 return; 956 } 957 if (offsets.isEmpty()) { 958 // someone else claimed the queue 959 return; 960 } 961 ServerName sourceRS = queueId.getServerWALsBelongTo(); 962 ReplicationQueueId claimedQueueId = queueId.claim(server.getServerName()); 963 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 964 if (peer == null || peer != oldPeer) { 965 LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, sourceRS); 966 deleteQueue(claimedQueueId); 967 return; 968 } 969 ReplicationSourceInterface src; 970 try { 971 src = 972 createSource(new ReplicationQueueData(claimedQueueId, ImmutableMap.copyOf(offsets)), peer); 973 } catch (IOException e) { 974 LOG.error("Can not create replication source for peer {} and queue {}", peerId, 975 claimedQueueId, e); 976 server.abort("Failed to create replication source after claiming queue.", e); 977 return; 978 } 979 PriorityQueue<Path> walFiles; 980 try { 981 walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets); 982 } catch (IOException e) { 983 LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e); 984 server.abort("Can not list wal files after claiming queue.", e); 985 return; 986 } 987 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 988 synchronized (oldsources) { 989 addRecoveredSource(src, oldPeer, claimedQueueId, walFiles); 990 } 991 } 992 993 /** 994 * Terminate the replication on this region server 995 */ 996 public void join() { 997 this.executor.shutdown(); 998 for (ReplicationSourceInterface source : this.sources.values()) { 999 source.terminate("Region server is closing"); 1000 } 1001 synchronized (oldsources) { 1002 for (ReplicationSourceInterface source : this.oldsources) { 1003 source.terminate("Region server is closing"); 1004 } 1005 } 1006 } 1007 1008 /** 1009 * Get a copy of the wals of the normal sources on this rs 1010 * @return a sorted set of wal names 1011 */ 1012 @RestrictedApi(explanation = "Should only be called in tests", link = "", 1013 allowedOnPath = ".*/src/test/.*") 1014 public Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWALs() { 1015 return Collections.unmodifiableMap(walsById); 1016 } 1017 1018 /** 1019 * Get a list of all the normal sources of this rs 1020 * @return list of all normal sources 1021 */ 1022 public List<ReplicationSourceInterface> getSources() { 1023 return new ArrayList<>(this.sources.values()); 1024 } 1025 1026 /** 1027 * Get a list of all the recovered sources of this rs 1028 * @return list of all recovered sources 1029 */ 1030 public List<ReplicationSourceInterface> getOldSources() { 1031 return this.oldsources; 1032 } 1033 1034 /** 1035 * Get the normal source for a given peer 1036 * @return the normal source for the give peer if it exists, otherwise null. 1037 */ 1038 public ReplicationSourceInterface getSource(String peerId) { 1039 return this.sources.get(peerId); 1040 } 1041 1042 int getSizeOfLatestPath() { 1043 synchronized (latestPaths) { 1044 return latestPaths.size(); 1045 } 1046 } 1047 1048 Set<Path> getLastestPath() { 1049 synchronized (latestPaths) { 1050 return Sets.newHashSet(latestPaths.values()); 1051 } 1052 } 1053 1054 public long getTotalBufferUsed() { 1055 return totalBufferUsed.get(); 1056 } 1057 1058 /** 1059 * Returns the maximum size in bytes of edits held in memory which are pending replication across 1060 * all sources inside this RegionServer. 1061 */ 1062 public long getTotalBufferLimit() { 1063 return totalBufferLimit; 1064 } 1065 1066 /** 1067 * Get the directory where wals are archived 1068 * @return the directory where wals are archived 1069 */ 1070 public Path getOldLogDir() { 1071 return this.oldLogDir; 1072 } 1073 1074 /** 1075 * Get the directory where wals are stored by their RSs 1076 * @return the directory where wals are stored by their RSs 1077 */ 1078 public Path getLogDir() { 1079 return this.logDir; 1080 } 1081 1082 /** 1083 * Get the handle on the local file system 1084 * @return Handle on the local file system 1085 */ 1086 public FileSystem getFs() { 1087 return this.fs; 1088 } 1089 1090 /** 1091 * Get the ReplicationPeers used by this ReplicationSourceManager 1092 * @return the ReplicationPeers used by this ReplicationSourceManager 1093 */ 1094 public ReplicationPeers getReplicationPeers() { 1095 return this.replicationPeers; 1096 } 1097 1098 /** 1099 * Get a string representation of all the sources' metrics 1100 */ 1101 public String getStats() { 1102 StringBuilder stats = new StringBuilder(); 1103 // Print stats that apply across all Replication Sources 1104 stats.append("Global stats: "); 1105 stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=") 1106 .append(getTotalBufferLimit()).append("B\n"); 1107 for (ReplicationSourceInterface source : this.sources.values()) { 1108 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 1109 stats.append(source.getStats() + "\n"); 1110 } 1111 for (ReplicationSourceInterface oldSource : oldsources) { 1112 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 1113 stats.append(oldSource.getStats() + "\n"); 1114 } 1115 return stats.toString(); 1116 } 1117 1118 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 1119 throws IOException { 1120 for (ReplicationSourceInterface source : this.sources.values()) { 1121 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 1122 } 1123 } 1124 1125 public void cleanUpHFileRefs(String peerId, List<String> files) { 1126 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 1127 } 1128 1129 int activeFailoverTaskCount() { 1130 return executor.getActiveCount(); 1131 } 1132 1133 MetricsReplicationGlobalSourceSource getGlobalMetrics() { 1134 return this.globalMetrics; 1135 } 1136 1137 ReplicationQueueStorage getQueueStorage() { 1138 return queueStorage; 1139 } 1140 1141 /** 1142 * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}. 1143 * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer 1144 * quota. 1145 * @return true if we should clear buffer and push all 1146 */ 1147 boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) { 1148 long entrySize = walEntryBatch.incrementUsedBufferSize(entry); 1149 return this.acquireBufferQuota(entrySize); 1150 } 1151 1152 /** 1153 * To release the buffer quota of {@link WALEntryBatch} which acquired by 1154 * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}. 1155 * @return the released buffer quota size. 1156 */ 1157 long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) { 1158 long usedBufferSize = walEntryBatch.getUsedBufferSize(); 1159 if (usedBufferSize > 0) { 1160 this.releaseBufferQuota(usedBufferSize); 1161 } 1162 return usedBufferSize; 1163 } 1164 1165 /** 1166 * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds 1167 * {@link ReplicationSourceManager#totalBufferLimit}. 1168 * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds 1169 * {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and 1170 * ship all. 1171 */ 1172 boolean acquireBufferQuota(long size) { 1173 if (size < 0) { 1174 throw new IllegalArgumentException("size should not less than 0"); 1175 } 1176 long newBufferUsed = addTotalBufferUsed(size); 1177 return newBufferUsed >= totalBufferLimit; 1178 } 1179 1180 /** 1181 * To release the buffer quota which acquired by 1182 * {@link ReplicationSourceManager#acquireBufferQuota}. 1183 */ 1184 void releaseBufferQuota(long size) { 1185 if (size < 0) { 1186 throw new IllegalArgumentException("size should not less than 0"); 1187 } 1188 addTotalBufferUsed(-size); 1189 } 1190 1191 private long addTotalBufferUsed(long size) { 1192 if (size == 0) { 1193 return totalBufferUsed.get(); 1194 } 1195 long newBufferUsed = totalBufferUsed.addAndGet(size); 1196 // Record the new buffer usage 1197 this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed); 1198 return newBufferUsed; 1199 } 1200 1201 /** 1202 * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds 1203 * {@link ReplicationSourceManager#totalBufferLimit} for peer. 1204 * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than 1205 * {@link ReplicationSourceManager#totalBufferLimit}. 1206 */ 1207 boolean checkBufferQuota(String peerId) { 1208 // try not to go over total quota 1209 if (totalBufferUsed.get() > totalBufferLimit) { 1210 LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", 1211 peerId, totalBufferUsed.get(), totalBufferLimit); 1212 return false; 1213 } 1214 return true; 1215 } 1216}