001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URLEncoder; 024import java.nio.charset.StandardCharsets; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Iterator; 030import java.util.List; 031import java.util.Map; 032import java.util.NavigableSet; 033import java.util.OptionalLong; 034import java.util.PriorityQueue; 035import java.util.Set; 036import java.util.SortedSet; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.ThreadLocalRandom; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.stream.Collectors; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.Server; 052import org.apache.hadoop.hbase.ServerName; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.RegionInfo; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 057import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; 058import org.apache.hadoop.hbase.replication.ReplicationPeer; 059import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 060import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 061import org.apache.hadoop.hbase.replication.ReplicationPeers; 062import org.apache.hadoop.hbase.replication.ReplicationQueueData; 063import org.apache.hadoop.hbase.replication.ReplicationQueueId; 064import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 065import org.apache.hadoop.hbase.replication.ReplicationUtils; 066import org.apache.hadoop.hbase.replication.SyncReplicationState; 067import org.apache.hadoop.hbase.util.Pair; 068import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 069import org.apache.hadoop.hbase.wal.AbstractWALProvider; 070import org.apache.hadoop.hbase.wal.WAL.Entry; 071import org.apache.hadoop.hbase.wal.WALFactory; 072import org.apache.yetus.audience.InterfaceAudience; 073import org.apache.zookeeper.KeeperException; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 078import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 079import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 080 081/** 082 * This class is responsible to manage all the replication sources. There are two classes of 083 * sources: 084 * <ul> 085 * <li>Normal sources are persistent and one per peer cluster</li> 086 * <li>Old sources are recovered from a failed region server and our only goal is to finish 087 * replicating the WAL queue it had</li> 088 * </ul> 089 * <p> 090 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 091 * in order to transfer all the queues in a local old source. 092 * <p> 093 * Synchronization specification: 094 * <ul> 095 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 096 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 097 * operations.</li> 098 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 099 * {@link #addPeer(String)}, {@link #removePeer(String)}, 100 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 101 * {@link #postLogRoll(Path)}. {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer 102 * id in {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 103 * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} 104 * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 105 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 106 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 107 * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 108 * {@link #postLogRoll(Path)}.</li> 109 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 110 * modify it, {@link #removePeer(String)} , 111 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 112 * {@link #claimQueue(ReplicationQueueId)}. 113 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by 114 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 115 * {@link ReplicationSourceInterface} firstly, then remove the wals from 116 * {@link #walsByIdRecoveredQueues}. And {@link #claimQueue(ReplicationQueueId)} will add the wals 117 * to {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. 118 * So there is no race here. For {@link #claimQueue(ReplicationQueueId)} and 119 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need 120 * synchronized on {@link #walsByIdRecoveredQueues}.</li> 121 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 122 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 123 * to-be-removed peer.</li> 124 * </ul> 125 */ 126@InterfaceAudience.Private 127public class ReplicationSourceManager { 128 129 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 130 // all the sources that read this RS's logs and every peer only has one replication source 131 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 132 // List of all the sources we got from died RSs 133 private final List<ReplicationSourceInterface> oldsources; 134 135 /** 136 * Storage for queues that need persistance; e.g. Replication state so can be recovered after a 137 * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances 138 * for these to do updates themselves. Not all ReplicationSource instances keep state. 139 */ 140 private final ReplicationQueueStorage queueStorage; 141 142 private final ReplicationPeers replicationPeers; 143 // UUID for this cluster 144 private final UUID clusterId; 145 // All about stopping 146 private final Server server; 147 148 // All logs we are currently tracking 149 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 150 private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById; 151 // Logs for recovered sources we are currently tracking 152 // the map is: queue_id->logPrefix/logGroup->logs 153 // for recovered source, the WAL files should already been moved to oldLogDir, and we have 154 // different layout of old WAL files, for example, with server name sub directories or not, so 155 // here we record the full path instead of just the name, so when refreshing we can enqueue the 156 // WAL file again, without trying to guess the real path of the WAL files. 157 private final ConcurrentMap<ReplicationQueueId, 158 Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues; 159 160 private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; 161 162 private final Configuration conf; 163 private final FileSystem fs; 164 // The paths to the latest log of each wal group, for new coming peers 165 private final Map<String, Path> latestPaths; 166 // Path to the wals directories 167 private final Path logDir; 168 // Path to the wal archive 169 private final Path oldLogDir; 170 private final WALFactory walFactory; 171 // The number of ms that we wait before moving znodes, HBASE-3596 172 private final long sleepBeforeFailover; 173 // Homemade executer service for replication 174 private final ThreadPoolExecutor executor; 175 176 private AtomicLong totalBufferUsed = new AtomicLong(); 177 178 // How long should we sleep for each retry when deleting remote wal files for sync replication 179 // peer. 180 private final long sleepForRetries; 181 // Maximum number of retries before taking bold actions when deleting remote wal files for sync 182 // replication peer. 183 private final int maxRetriesMultiplier; 184 // Total buffer size on this RegionServer for holding batched edits to be shipped. 185 private final long totalBufferLimit; 186 private final MetricsReplicationGlobalSourceSource globalMetrics; 187 188 /** 189 * Creates a replication manager and sets the watch on all the other registered region servers 190 * @param queueStorage the interface for manipulating replication queues 191 * @param conf the configuration to use 192 * @param server the server for this region server 193 * @param fs the file system to use 194 * @param logDir the directory that contains all wal directories of live RSs 195 * @param oldLogDir the directory where old logs are archived 196 */ 197 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 198 ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs, 199 Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory, 200 SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, 201 MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { 202 this.sources = new ConcurrentHashMap<>(); 203 this.queueStorage = queueStorage; 204 this.replicationPeers = replicationPeers; 205 this.server = server; 206 this.walsById = new ConcurrentHashMap<>(); 207 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 208 this.oldsources = new ArrayList<>(); 209 this.conf = conf; 210 this.fs = fs; 211 this.logDir = logDir; 212 this.oldLogDir = oldLogDir; 213 // 30 seconds 214 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); 215 this.clusterId = clusterId; 216 this.walFactory = walFactory; 217 this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; 218 // It's preferable to failover 1 RS at a time, but with good zk servers 219 // more could be processed at the same time. 220 int nbWorkers = conf.getInt("replication.executor.workers", 1); 221 // use a short 100ms sleep since this could be done inline with a RS startup 222 // even if we fail, other region servers can take care of it 223 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, 224 new LinkedBlockingQueue<>()); 225 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 226 tfb.setNameFormat("ReplicationExecutor-%d"); 227 tfb.setDaemon(true); 228 this.executor.setThreadFactory(tfb.build()); 229 this.latestPaths = new HashMap<>(); 230 this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); 231 this.maxRetriesMultiplier = 232 this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); 233 this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 234 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 235 this.globalMetrics = globalMetrics; 236 } 237 238 /** 239 * Adds a normal source per registered peer cluster. 240 */ 241 void init() throws IOException { 242 for (String id : this.replicationPeers.getAllPeerIds()) { 243 addSource(id, true); 244 } 245 } 246 247 /** 248 * <ol> 249 * <li>Add peer to replicationPeers</li> 250 * <li>Add the normal source and related replication queue</li> 251 * <li>Add HFile Refs</li> 252 * </ol> 253 * @param peerId the id of replication peer 254 */ 255 public void addPeer(String peerId) throws IOException { 256 boolean added = false; 257 try { 258 added = this.replicationPeers.addPeer(peerId); 259 } catch (ReplicationException e) { 260 throw new IOException(e); 261 } 262 if (added) { 263 addSource(peerId, false); 264 } 265 } 266 267 /** 268 * <ol> 269 * <li>Remove peer for replicationPeers</li> 270 * <li>Remove all the recovered sources for the specified id and related replication queues</li> 271 * <li>Remove the normal source and related replication queue</li> 272 * <li>Remove HFile Refs</li> 273 * </ol> 274 * @param peerId the id of the replication peer 275 */ 276 public void removePeer(String peerId) { 277 ReplicationPeer peer = replicationPeers.removePeer(peerId); 278 String terminateMessage = "Replication stream was removed by a user"; 279 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 280 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 281 // see NodeFailoverWorker.run 282 synchronized (this.oldsources) { 283 // First close all the recovered sources for this peer 284 for (ReplicationSourceInterface src : oldsources) { 285 if (peerId.equals(src.getPeerId())) { 286 oldSourcesToDelete.add(src); 287 } 288 } 289 for (ReplicationSourceInterface src : oldSourcesToDelete) { 290 src.terminate(terminateMessage); 291 removeRecoveredSource(src); 292 } 293 } 294 LOG.info("Number of deleted recovered sources for {}: {}", peerId, oldSourcesToDelete.size()); 295 // Now close the normal source for this peer 296 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 297 if (srcToRemove != null) { 298 srcToRemove.terminate(terminateMessage); 299 removeSource(srcToRemove); 300 } 301 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 302 if (peerConfig.isSyncReplication()) { 303 syncReplicationPeerMappingManager.remove(peerId, peerConfig); 304 } 305 } 306 307 /** 308 * @return a new 'classic' user-space replication source. 309 * @param queueId the id of the replication queue to associate the ReplicationSource with. 310 * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta. 311 */ 312 private ReplicationSourceInterface createSource(ReplicationQueueData queueData, 313 ReplicationPeer replicationPeer) throws IOException { 314 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueData.getId()); 315 // Init the just created replication source. Pass the default walProvider's wal file length 316 // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica 317 // replication, see #createCatalogReplicationSource(). 318 WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null 319 ? this.walFactory.getWALProvider().getWALFileLengthProvider() 320 : p -> OptionalLong.empty(); 321 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueData, clusterId, 322 walFileLengthProvider, new MetricsSource(queueData.getId().toString())); 323 return src; 324 } 325 326 /** 327 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 328 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 329 * group and do replication. 330 * <p/> 331 * We add a {@code init} parameter to indicate whether this is part of the initialization process. 332 * If so, we should skip adding the replication queues as this may introduce dead lock on region 333 * server start up and hbase:replication table online. 334 * @param peerId the id of the replication peer 335 * @param init whether this call is part of the initialization process 336 * @return the source that was created 337 */ 338 void addSource(String peerId, boolean init) throws IOException { 339 ReplicationPeer peer = replicationPeers.getPeer(peerId); 340 if ( 341 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 342 .equals(peer.getPeerConfig().getReplicationEndpointImpl()) 343 ) { 344 // we do not use this endpoint for region replication any more, see HBASE-26233 345 LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig()); 346 return; 347 } 348 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 349 ReplicationSourceInterface src = 350 createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); 351 // synchronized on latestPaths to avoid missing the new log 352 synchronized (this.latestPaths) { 353 this.sources.put(peerId, src); 354 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 355 this.walsById.put(queueId, walsByGroup); 356 // Add the latest wal to that source's queue 357 if (!latestPaths.isEmpty()) { 358 for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) { 359 Path walPath = walPrefixAndPath.getValue(); 360 NavigableSet<String> wals = new TreeSet<>(); 361 wals.add(walPath.getName()); 362 walsByGroup.put(walPrefixAndPath.getKey(), wals); 363 if (!init) { 364 // Abort RS and throw exception to make add peer failed 365 // Ideally we'd better use the current file size as offset so we can skip replicating 366 // the data before adding replication peer, but the problem is that the file may not end 367 // at a valid entry's ending, and the current WAL Reader implementation can not deal 368 // with reading from the middle of a WAL entry. Can improve later. 369 abortAndThrowIOExceptionWhenFail( 370 () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), 371 new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); 372 } 373 src.enqueueLog(walPath); 374 LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); 375 } 376 } 377 } 378 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 379 if (peerConfig.isSyncReplication()) { 380 syncReplicationPeerMappingManager.add(peer.getId(), peerConfig); 381 } 382 src.startup(); 383 } 384 385 /** 386 * <p> 387 * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}. 388 * </p> 389 * <p> 390 * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal 391 * files for a replication peer as we do not need to replicate them any more. And this is 392 * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE} 393 * later, the stale data will be replicated again and cause inconsistency. 394 * </p> 395 * <p> 396 * See HBASE-20426 for more details. 397 * </p> 398 * @param peerId the id of the sync replication peer 399 */ 400 public void drainSources(String peerId) throws IOException, ReplicationException { 401 String terminateMessage = "Sync replication peer " + peerId 402 + " is transiting to STANDBY. Will close the previous replication source and open a new one"; 403 ReplicationPeer peer = replicationPeers.getPeer(peerId); 404 assert peer.getPeerConfig().isSyncReplication(); 405 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 406 // TODO: use empty initial offsets for now, revisit when adding support for sync replication 407 ReplicationSourceInterface src = 408 createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); 409 // synchronized here to avoid race with postLogRoll where we add new log to source and also 410 // walsById. 411 ReplicationSourceInterface toRemove; 412 ReplicationQueueData queueData; 413 synchronized (latestPaths) { 414 // Here we make a copy of all the remaining wal files and then delete them from the 415 // replication queue storage after releasing the lock. It is not safe to just remove the old 416 // map from walsById since later we may fail to update the replication queue storage, and when 417 // we retry next time, we can not know the wal files that needs to be set to the replication 418 // queue storage 419 ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder(); 420 synchronized (walsById) { 421 walsById.get(queueId).forEach((group, wals) -> { 422 if (!wals.isEmpty()) { 423 builder.put(group, new ReplicationGroupOffset(wals.last(), -1)); 424 } 425 }); 426 } 427 queueData = new ReplicationQueueData(queueId, builder.build()); 428 src = createSource(queueData, peer); 429 toRemove = sources.put(peerId, src); 430 if (toRemove != null) { 431 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 432 toRemove.terminate(terminateMessage); 433 toRemove.getSourceMetrics().clear(); 434 } 435 } 436 for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) { 437 queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap()); 438 } 439 LOG.info("Startup replication source for " + src.getPeerId()); 440 src.startup(); 441 synchronized (walsById) { 442 Map<String, NavigableSet<String>> wals = walsById.get(queueId); 443 queueData.getOffsets().forEach((group, offset) -> { 444 NavigableSet<String> walsByGroup = wals.get(group); 445 if (walsByGroup != null) { 446 walsByGroup.headSet(offset.getWal(), true).clear(); 447 } 448 }); 449 } 450 // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is 451 // a background task, we will delete the file from replication queue storage under the lock to 452 // simplify the logic. 453 synchronized (this.oldsources) { 454 for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) { 455 ReplicationSourceInterface oldSource = iter.next(); 456 if (oldSource.getPeerId().equals(peerId)) { 457 ReplicationQueueId oldSourceQueueId = oldSource.getQueueId(); 458 oldSource.terminate(terminateMessage); 459 oldSource.getSourceMetrics().clear(); 460 queueStorage.removeQueue(oldSourceQueueId); 461 walsByIdRecoveredQueues.remove(oldSourceQueueId); 462 iter.remove(); 463 } 464 } 465 } 466 } 467 468 private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, 469 ReplicationPeer peer) throws IOException, ReplicationException { 470 Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); 471 return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); 472 } 473 474 /** 475 * Close the previous replication sources of this peer id and open new sources to trigger the new 476 * replication state changes or new replication config changes. Here we don't need to change 477 * replication queue storage and only to enqueue all logs to the new replication source 478 * @param peerId the id of the replication peer 479 */ 480 public void refreshSources(String peerId) throws ReplicationException, IOException { 481 String terminateMessage = "Peer " + peerId 482 + " state or config changed. Will close the previous replication source and open a new one"; 483 ReplicationPeer peer = replicationPeers.getPeer(peerId); 484 ReplicationQueueId queueId = new ReplicationQueueId(server.getServerName(), peerId); 485 ReplicationSourceInterface src; 486 // synchronized on latestPaths to avoid missing the new log 487 synchronized (this.latestPaths) { 488 ReplicationSourceInterface toRemove = this.sources.remove(peerId); 489 if (toRemove != null) { 490 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 491 // Do not clear metrics 492 toRemove.terminate(terminateMessage, null, false); 493 } 494 src = createRefreshedSource(queueId, peer); 495 this.sources.put(peerId, src); 496 for (NavigableSet<String> walsByGroup : walsById.get(queueId).values()) { 497 walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); 498 } 499 } 500 LOG.info("Startup replication source for " + src.getPeerId()); 501 src.startup(); 502 503 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 504 // synchronized on oldsources to avoid race with NodeFailoverWorker 505 synchronized (this.oldsources) { 506 List<ReplicationQueueId> oldSourceQueueIds = new ArrayList<>(); 507 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 508 .hasNext();) { 509 ReplicationSourceInterface oldSource = iter.next(); 510 if (oldSource.getPeerId().equals(peerId)) { 511 oldSourceQueueIds.add(oldSource.getQueueId()); 512 oldSource.terminate(terminateMessage); 513 iter.remove(); 514 } 515 } 516 for (ReplicationQueueId oldSourceQueueId : oldSourceQueueIds) { 517 ReplicationSourceInterface recoveredReplicationSource = 518 createRefreshedSource(oldSourceQueueId, peer); 519 this.oldsources.add(recoveredReplicationSource); 520 for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId) 521 .values()) { 522 walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal)); 523 } 524 toStartup.add(recoveredReplicationSource); 525 } 526 } 527 for (ReplicationSourceInterface replicationSource : toStartup) { 528 replicationSource.startup(); 529 } 530 } 531 532 /** 533 * Clear the metrics and related replication queue of the specified old source 534 * @param src source to clear 535 */ 536 private boolean removeRecoveredSource(ReplicationSourceInterface src) { 537 if (!this.oldsources.remove(src)) { 538 return false; 539 } 540 LOG.info("Done with the recovered queue {}", src.getQueueId()); 541 // Delete queue from storage and memory 542 deleteQueue(src.getQueueId()); 543 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 544 return true; 545 } 546 547 void finishRecoveredSource(ReplicationSourceInterface src) { 548 synchronized (oldsources) { 549 if (!removeRecoveredSource(src)) { 550 return; 551 } 552 } 553 LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), 554 src.getStats()); 555 } 556 557 /** 558 * Clear the metrics and related replication queue of the specified old source 559 * @param src source to clear 560 */ 561 void removeSource(ReplicationSourceInterface src) { 562 LOG.info("Done with the queue " + src.getQueueId()); 563 this.sources.remove(src.getPeerId()); 564 // Delete queue from storage and memory 565 deleteQueue(src.getQueueId()); 566 this.walsById.remove(src.getQueueId()); 567 568 } 569 570 /** 571 * Delete a complete queue of wals associated with a replication source 572 * @param queueId the id of replication queue to delete 573 */ 574 private void deleteQueue(ReplicationQueueId queueId) { 575 abortWhenFail(() -> this.queueStorage.removeQueue(queueId)); 576 } 577 578 @FunctionalInterface 579 private interface ReplicationQueueOperation { 580 void exec() throws ReplicationException; 581 } 582 583 /** 584 * Refresh replication source will terminate the old source first, then the source thread will be 585 * interrupted. Need to handle it instead of abort the region server. 586 */ 587 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 588 try { 589 op.exec(); 590 } catch (ReplicationException e) { 591 if ( 592 e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 593 && e.getCause().getCause() != null 594 && e.getCause().getCause() instanceof InterruptedException 595 ) { 596 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 597 // that thread is interrupted deep down in the stack, it should pass the following 598 // processing logic and propagate to the most top layer which can handle this exception 599 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 600 throw new ReplicationRuntimeException( 601 "Thread is interrupted, the replication source may be terminated", 602 e.getCause().getCause()); 603 } 604 server.abort("Failed to operate on replication queue", e); 605 } 606 } 607 608 private void abortWhenFail(ReplicationQueueOperation op) { 609 try { 610 op.exec(); 611 } catch (ReplicationException e) { 612 server.abort("Failed to operate on replication queue", e); 613 } 614 } 615 616 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 617 try { 618 op.exec(); 619 } catch (ReplicationException e) { 620 throw new IOException(e); 621 } 622 } 623 624 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 625 try { 626 op.exec(); 627 } catch (ReplicationException e) { 628 server.abort("Failed to operate on replication queue", e); 629 throw new IOException(e); 630 } 631 } 632 633 /** 634 * This method will log the current position to storage. And also clean old logs from the 635 * replication queue. 636 * @param source the replication source 637 * @param entryBatch the wal entry batch we just shipped 638 */ 639 public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, 640 WALEntryBatch entryBatch) { 641 String walName = entryBatch.getLastWalPath().getName(); 642 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(walName); 643 // if end of file, we just set the offset to -1 so we know that this file has already been fully 644 // replicated, otherwise we need to compare the file length 645 ReplicationGroupOffset offset = new ReplicationGroupOffset(walName, 646 entryBatch.isEndOfFile() ? -1 : entryBatch.getLastWalPosition()); 647 interruptOrAbortWhenFail(() -> this.queueStorage.setOffset(source.getQueueId(), walPrefix, 648 offset, entryBatch.getLastSeqIds())); 649 cleanOldLogs(walName, entryBatch.isEndOfFile(), source); 650 } 651 652 /** 653 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 654 * file is closed and has no more entries. 655 * @param log Path to the log 656 * @param inclusive whether we should also remove the given log file 657 * @param source the replication source 658 */ 659 void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { 660 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 661 if (source.isRecovered()) { 662 NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); 663 if (wals != null) { 664 // here we just want to compare the timestamp, so it is OK to just create a fake WAL path 665 NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive) 666 .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new)); 667 if (walsToRemove.isEmpty()) { 668 return; 669 } 670 cleanOldLogs(walsToRemove, source); 671 walsToRemove.clear(); 672 } 673 } else { 674 NavigableSet<String> wals; 675 NavigableSet<String> walsToRemove; 676 // synchronized on walsById to avoid race with postLogRoll 677 synchronized (this.walsById) { 678 wals = walsById.get(source.getQueueId()).get(logPrefix); 679 if (wals == null) { 680 return; 681 } 682 walsToRemove = wals.headSet(log, inclusive); 683 if (walsToRemove.isEmpty()) { 684 return; 685 } 686 walsToRemove = new TreeSet<>(walsToRemove); 687 } 688 // cleanOldLogs may spend some time, especially for sync replication where we may want to 689 // remove remote wals as the remote cluster may have already been down, so we do it outside 690 // the lock to avoid block preLogRoll 691 cleanOldLogs(walsToRemove, source); 692 // now let's remove the files in the set 693 synchronized (this.walsById) { 694 wals.removeAll(walsToRemove); 695 } 696 } 697 } 698 699 private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) 700 throws IOException { 701 Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); 702 FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); 703 for (String wal : wals) { 704 Path walFile = new Path(remoteWALDirForPeer, wal); 705 try { 706 if (!fs.delete(walFile, false) && fs.exists(walFile)) { 707 throw new IOException("Can not delete " + walFile); 708 } 709 } catch (FileNotFoundException e) { 710 // Just ignore since this means the file has already been deleted. 711 // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an 712 // inexistent file, so here we deal with both, i.e, check the return value of the 713 // FileSystem.delete, and also catch FNFE. 714 LOG.debug("The remote wal {} has already been deleted?", walFile, e); 715 } 716 } 717 } 718 719 private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) { 720 LOG.debug("Removing {} logs in the list: {}", wals.size(), wals); 721 // The intention here is that, we want to delete the remote wal files ASAP as it may effect the 722 // failover time if you want to transit the remote cluster from S to A. And the infinite retry 723 // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can 724 // not contact with the HBase cluster either, so the replication will be blocked either. 725 if (source.isSyncReplication()) { 726 String peerId = source.getPeerId(); 727 String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); 728 // Filter out the wals need to be removed from the remote directory. Its name should be the 729 // special format, and also, the peer id in its name should match the peer id for the 730 // replication source. 731 List<String> remoteWals = 732 wals.stream().filter(w -> AbstractWALProvider.getSyncReplicationPeerIdFromWALName(w) 733 .map(peerId::equals).orElse(false)).collect(Collectors.toList()); 734 LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(), 735 remoteWALDir, remoteWals); 736 if (!remoteWals.isEmpty()) { 737 for (int sleepMultiplier = 0;;) { 738 try { 739 removeRemoteWALs(peerId, remoteWALDir, remoteWals); 740 break; 741 } catch (IOException e) { 742 LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, 743 peerId); 744 } 745 if (!source.isSourceActive()) { 746 // skip the following operations 747 return; 748 } 749 if ( 750 ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, 751 sleepMultiplier, maxRetriesMultiplier) 752 ) { 753 sleepMultiplier++; 754 } 755 } 756 } 757 } 758 } 759 760 // public because of we call it in TestReplicationEmptyWALRecovery 761 public void postLogRoll(Path newLog) throws IOException { 762 String logName = newLog.getName(); 763 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 764 // synchronized on latestPaths to avoid the new open source miss the new log 765 synchronized (this.latestPaths) { 766 // synchronized on walsById to avoid race with cleanOldLogs 767 synchronized (this.walsById) { 768 // Update walsById map 769 for (Map.Entry<ReplicationQueueId, Map<String, NavigableSet<String>>> entry : this.walsById 770 .entrySet()) { 771 ReplicationQueueId queueId = entry.getKey(); 772 String peerId = queueId.getPeerId(); 773 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 774 boolean existingPrefix = false; 775 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 776 SortedSet<String> wals = walsEntry.getValue(); 777 if (this.sources.isEmpty()) { 778 // If there's no slaves, don't need to keep the old wals since 779 // we only consider the last one when a new slave comes in 780 wals.clear(); 781 } 782 if (logPrefix.equals(walsEntry.getKey())) { 783 wals.add(logName); 784 existingPrefix = true; 785 } 786 } 787 if (!existingPrefix) { 788 // The new log belongs to a new group, add it into this peer 789 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 790 NavigableSet<String> wals = new TreeSet<>(); 791 wals.add(logName); 792 walsByPrefix.put(logPrefix, wals); 793 } 794 } 795 } 796 797 // Add to latestPaths 798 latestPaths.put(logPrefix, newLog); 799 } 800 // This only updates the sources we own, not the recovered ones 801 for (ReplicationSourceInterface source : this.sources.values()) { 802 source.enqueueLog(newLog); 803 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog, 804 source.getQueueId()); 805 } 806 } 807 808 /** 809 * Check whether we should replicate the given {@code wal}. 810 * @param wal the file name of the wal 811 * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}. 812 */ 813 private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) { 814 // skip replicating meta wals 815 if (AbstractFSWALProvider.isMetaFile(wal)) { 816 return false; 817 } 818 return ReplicationOffsetUtil.shouldReplicate(offset, wal); 819 } 820 821 void claimQueue(ReplicationQueueId queueId) { 822 claimQueue(queueId, false); 823 } 824 825 // sorted from oldest to newest 826 private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, 827 Map<String, ReplicationGroupOffset> offsets) throws IOException { 828 List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS, 829 URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name())); 830 if (syncUp) { 831 // we also need to list WALs directory for ReplicationSyncUp 832 walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS)); 833 } 834 PriorityQueue<Path> walFilesPQ = 835 new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); 836 // sort the wal files and also filter out replicated files 837 for (Path file : walFiles) { 838 String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName()); 839 ReplicationGroupOffset groupOffset = offsets.get(walGroupId); 840 if (shouldReplicate(groupOffset, file.getName())) { 841 walFilesPQ.add(file); 842 } else { 843 LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), 844 groupOffset); 845 } 846 } 847 return walFilesPQ; 848 } 849 850 private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, 851 ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) { 852 ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId()); 853 if (peer == null || peer != oldPeer) { 854 src.terminate("Recovered queue doesn't belong to any current peer"); 855 deleteQueue(claimedQueueId); 856 return; 857 } 858 // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is 859 // transiting to STANDBY state. The only exception is we are in STANDBY state and 860 // transiting to DA, under this state we will replay the remote WAL and they need to be 861 // replicated back. 862 if (peer.getPeerConfig().isSyncReplication()) { 863 Pair<SyncReplicationState, SyncReplicationState> stateAndNewState = 864 peer.getSyncReplicationStateAndNewState(); 865 if ( 866 (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) 867 && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) 868 || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY) 869 ) { 870 src.terminate("Sync replication peer is in STANDBY state"); 871 deleteQueue(claimedQueueId); 872 return; 873 } 874 } 875 // track sources in walsByIdRecoveredQueues 876 Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>(); 877 walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup); 878 for (Path wal : walFiles) { 879 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 880 NavigableSet<Path> wals = walsByGroup.get(walPrefix); 881 if (wals == null) { 882 wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); 883 walsByGroup.put(walPrefix, wals); 884 } 885 wals.add(wal); 886 } 887 oldsources.add(src); 888 LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId, 889 walFiles.size()); 890 for (Path wal : walFiles) { 891 LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId); 892 src.enqueueLog(wal); 893 } 894 src.startup(); 895 } 896 897 /** 898 * Claim a replication queue. 899 * <p/> 900 * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue 901 * operation, we are the last step of a SCP, so we can assume that all the WAL files are under 902 * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a 903 * region server which has not been processed by SCP yet, so we still need to look at its WALs 904 * directory. 905 * @param queueId the replication queue id we want to claim 906 * @param syncUp whether we are called by ReplicationSyncUp 907 */ 908 void claimQueue(ReplicationQueueId queueId, boolean syncUp) { 909 // Wait a bit before transferring the queues, we may be shutting down. 910 // This sleep may not be enough in some cases. 911 try { 912 Thread.sleep(sleepBeforeFailover 913 + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 914 } catch (InterruptedException e) { 915 LOG.warn("Interrupted while waiting before transferring a queue."); 916 Thread.currentThread().interrupt(); 917 } 918 // We try to lock that rs' queue directory 919 if (server.isStopped()) { 920 LOG.info("Not transferring queue since we are shutting down"); 921 return; 922 } 923 // After claim the queues from dead region server, we will skip to start the 924 // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a 925 // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get 926 // a copy of the replication peer first to decide whether we should start the 927 // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to 928 // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). 929 String peerId = queueId.getPeerId(); 930 ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); 931 if (oldPeer == null) { 932 LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", 933 peerId, queueId); 934 return; 935 } 936 Map<String, ReplicationGroupOffset> offsets; 937 try { 938 offsets = queueStorage.claimQueue(queueId, server.getServerName()); 939 } catch (ReplicationException e) { 940 LOG.error("ReplicationException: cannot claim dead region ({})'s replication queue", 941 queueId.getServerName(), e); 942 server.abort("Failed to claim queue from dead regionserver.", e); 943 return; 944 } 945 if (offsets.isEmpty()) { 946 // someone else claimed the queue 947 return; 948 } 949 ServerName sourceRS = queueId.getServerWALsBelongTo(); 950 ReplicationQueueId claimedQueueId = queueId.claim(server.getServerName()); 951 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 952 if (peer == null || peer != oldPeer) { 953 LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, sourceRS); 954 deleteQueue(claimedQueueId); 955 return; 956 } 957 ReplicationSourceInterface src; 958 try { 959 src = 960 createSource(new ReplicationQueueData(claimedQueueId, ImmutableMap.copyOf(offsets)), peer); 961 } catch (IOException e) { 962 LOG.error("Can not create replication source for peer {} and queue {}", peerId, 963 claimedQueueId, e); 964 server.abort("Failed to create replication source after claiming queue.", e); 965 return; 966 } 967 PriorityQueue<Path> walFiles; 968 try { 969 walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets); 970 } catch (IOException e) { 971 LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e); 972 server.abort("Can not list wal files after claiming queue.", e); 973 return; 974 } 975 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 976 synchronized (oldsources) { 977 addRecoveredSource(src, oldPeer, claimedQueueId, walFiles); 978 } 979 } 980 981 /** 982 * Terminate the replication on this region server 983 */ 984 public void join() { 985 this.executor.shutdown(); 986 for (ReplicationSourceInterface source : this.sources.values()) { 987 source.terminate("Region server is closing"); 988 } 989 synchronized (oldsources) { 990 for (ReplicationSourceInterface source : this.oldsources) { 991 source.terminate("Region server is closing"); 992 } 993 } 994 } 995 996 /** 997 * Get a copy of the wals of the normal sources on this rs 998 * @return a sorted set of wal names 999 */ 1000 @RestrictedApi(explanation = "Should only be called in tests", link = "", 1001 allowedOnPath = ".*/src/test/.*") 1002 public Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWALs() { 1003 return Collections.unmodifiableMap(walsById); 1004 } 1005 1006 /** 1007 * Get a list of all the normal sources of this rs 1008 * @return list of all normal sources 1009 */ 1010 public List<ReplicationSourceInterface> getSources() { 1011 return new ArrayList<>(this.sources.values()); 1012 } 1013 1014 /** 1015 * Get a list of all the recovered sources of this rs 1016 * @return list of all recovered sources 1017 */ 1018 public List<ReplicationSourceInterface> getOldSources() { 1019 return this.oldsources; 1020 } 1021 1022 /** 1023 * Get the normal source for a given peer 1024 * @return the normal source for the give peer if it exists, otherwise null. 1025 */ 1026 public ReplicationSourceInterface getSource(String peerId) { 1027 return this.sources.get(peerId); 1028 } 1029 1030 int getSizeOfLatestPath() { 1031 synchronized (latestPaths) { 1032 return latestPaths.size(); 1033 } 1034 } 1035 1036 Set<Path> getLastestPath() { 1037 synchronized (latestPaths) { 1038 return Sets.newHashSet(latestPaths.values()); 1039 } 1040 } 1041 1042 public long getTotalBufferUsed() { 1043 return totalBufferUsed.get(); 1044 } 1045 1046 /** 1047 * Returns the maximum size in bytes of edits held in memory which are pending replication across 1048 * all sources inside this RegionServer. 1049 */ 1050 public long getTotalBufferLimit() { 1051 return totalBufferLimit; 1052 } 1053 1054 /** 1055 * Get the directory where wals are archived 1056 * @return the directory where wals are archived 1057 */ 1058 public Path getOldLogDir() { 1059 return this.oldLogDir; 1060 } 1061 1062 /** 1063 * Get the directory where wals are stored by their RSs 1064 * @return the directory where wals are stored by their RSs 1065 */ 1066 public Path getLogDir() { 1067 return this.logDir; 1068 } 1069 1070 /** 1071 * Get the handle on the local file system 1072 * @return Handle on the local file system 1073 */ 1074 public FileSystem getFs() { 1075 return this.fs; 1076 } 1077 1078 /** 1079 * Get the ReplicationPeers used by this ReplicationSourceManager 1080 * @return the ReplicationPeers used by this ReplicationSourceManager 1081 */ 1082 public ReplicationPeers getReplicationPeers() { 1083 return this.replicationPeers; 1084 } 1085 1086 /** 1087 * Get a string representation of all the sources' metrics 1088 */ 1089 public String getStats() { 1090 StringBuilder stats = new StringBuilder(); 1091 // Print stats that apply across all Replication Sources 1092 stats.append("Global stats: "); 1093 stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=") 1094 .append(getTotalBufferLimit()).append("B\n"); 1095 for (ReplicationSourceInterface source : this.sources.values()) { 1096 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 1097 stats.append(source.getStats() + "\n"); 1098 } 1099 for (ReplicationSourceInterface oldSource : oldsources) { 1100 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 1101 stats.append(oldSource.getStats() + "\n"); 1102 } 1103 return stats.toString(); 1104 } 1105 1106 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 1107 throws IOException { 1108 for (ReplicationSourceInterface source : this.sources.values()) { 1109 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 1110 } 1111 } 1112 1113 public void cleanUpHFileRefs(String peerId, List<String> files) { 1114 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 1115 } 1116 1117 int activeFailoverTaskCount() { 1118 return executor.getActiveCount(); 1119 } 1120 1121 MetricsReplicationGlobalSourceSource getGlobalMetrics() { 1122 return this.globalMetrics; 1123 } 1124 1125 ReplicationQueueStorage getQueueStorage() { 1126 return queueStorage; 1127 } 1128 1129 /** 1130 * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}. 1131 * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer 1132 * quota. 1133 * @return true if we should clear buffer and push all 1134 */ 1135 boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) { 1136 long entrySize = walEntryBatch.incrementUsedBufferSize(entry); 1137 return this.acquireBufferQuota(entrySize); 1138 } 1139 1140 /** 1141 * To release the buffer quota of {@link WALEntryBatch} which acquired by 1142 * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}. 1143 * @return the released buffer quota size. 1144 */ 1145 long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) { 1146 long usedBufferSize = walEntryBatch.getUsedBufferSize(); 1147 if (usedBufferSize > 0) { 1148 this.releaseBufferQuota(usedBufferSize); 1149 } 1150 return usedBufferSize; 1151 } 1152 1153 /** 1154 * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds 1155 * {@link ReplicationSourceManager#totalBufferLimit}. 1156 * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds 1157 * {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and 1158 * ship all. 1159 */ 1160 boolean acquireBufferQuota(long size) { 1161 if (size < 0) { 1162 throw new IllegalArgumentException("size should not less than 0"); 1163 } 1164 long newBufferUsed = addTotalBufferUsed(size); 1165 return newBufferUsed >= totalBufferLimit; 1166 } 1167 1168 /** 1169 * To release the buffer quota which acquired by 1170 * {@link ReplicationSourceManager#acquireBufferQuota}. 1171 */ 1172 void releaseBufferQuota(long size) { 1173 if (size < 0) { 1174 throw new IllegalArgumentException("size should not less than 0"); 1175 } 1176 addTotalBufferUsed(-size); 1177 } 1178 1179 private long addTotalBufferUsed(long size) { 1180 if (size == 0) { 1181 return totalBufferUsed.get(); 1182 } 1183 long newBufferUsed = totalBufferUsed.addAndGet(size); 1184 // Record the new buffer usage 1185 this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed); 1186 return newBufferUsed; 1187 } 1188 1189 /** 1190 * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds 1191 * {@link ReplicationSourceManager#totalBufferLimit} for peer. 1192 * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than 1193 * {@link ReplicationSourceManager#totalBufferLimit}. 1194 */ 1195 boolean checkBufferQuota(String peerId) { 1196 // try not to go over total quota 1197 if (totalBufferUsed.get() > totalBufferLimit) { 1198 LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", 1199 peerId, totalBufferUsed.get(), totalBufferLimit); 1200 return false; 1201 } 1202 return true; 1203 } 1204}