001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.NavigableSet; 028import java.util.OptionalLong; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.TreeSet; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicLong; 040import java.util.concurrent.atomic.AtomicReference; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationPeer; 053import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 054import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 055import org.apache.hadoop.hbase.replication.ReplicationPeers; 056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.apache.hadoop.hbase.wal.WALFactory; 062import org.apache.hadoop.hbase.wal.WALProvider; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.apache.zookeeper.KeeperException; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 069import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 070 071/** 072 * This class is responsible to manage all the replication sources. There are two classes of 073 * sources: 074 * <ul> 075 * <li>Normal sources are persistent and one per peer cluster</li> 076 * <li>Old sources are recovered from a failed region server and our only goal is to finish 077 * replicating the WAL queue it had</li> 078 * </ul> 079 * <p> 080 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 081 * in order to transfer all the queues in a local old source. 082 * <p> 083 * Synchronization specification: 084 * <ul> 085 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 086 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 087 * operations.</li> 088 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 089 * {@link #addPeer(String)}, {@link #removePeer(String)}, 090 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. 091 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in 092 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 093 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is 094 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 095 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 096 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 097 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 098 * {@link #preLogRoll(Path)}.</li> 099 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 100 * modify it, {@link #removePeer(String)} , <<<<<<< HEAD 101 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 102 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. 103 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by ======= 104 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and 105 * {@link ReplicationSourceManager#claimQueue(ServerName, String)}. 106 * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by >>>>>>> 107 * 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event to track region server's 108 * death (#3430) {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 109 * {@link ReplicationSourceInterface} firstly, then remove the wals from 110 * {@link #walsByIdRecoveredQueues}. And 111 * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to 112 * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So 113 * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and 114 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need 115 * synchronized on {@link #walsByIdRecoveredQueues}.</li> 116 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 117 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 118 * to-be-removed peer.</li> 119 * </ul> 120 */ 121@InterfaceAudience.Private 122public class ReplicationSourceManager { 123 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 124 // all the sources that read this RS's logs and every peer only has one replication source 125 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 126 // List of all the sources we got from died RSs 127 private final List<ReplicationSourceInterface> oldsources; 128 129 /** 130 * Storage for queues that need persistance; e.g. Replication state so can be recovered after a 131 * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances 132 * for these to do updates themselves. Not all ReplicationSource instances keep state. 133 */ 134 private final ReplicationQueueStorage queueStorage; 135 136 private final ReplicationPeers replicationPeers; 137 // UUID for this cluster 138 private final UUID clusterId; 139 // All about stopping 140 private final Server server; 141 142 // All logs we are currently tracking 143 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 144 // For normal replication source, the peer id is same with the queue id 145 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; 146 // Logs for recovered sources we are currently tracking 147 // the map is: queue_id->logPrefix/logGroup->logs 148 // For recovered source, the queue id's format is peer_id-servername-* 149 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; 150 151 private final Configuration conf; 152 private final FileSystem fs; 153 // The paths to the latest log of each wal group, for new coming peers 154 private final Map<String, Path> latestPaths; 155 // Path to the wals directories 156 private final Path logDir; 157 // Path to the wal archive 158 private final Path oldLogDir; 159 private final WALFactory walFactory; 160 // The number of ms that we wait before moving znodes, HBASE-3596 161 private final long sleepBeforeFailover; 162 // Homemade executer service for replication 163 private final ThreadPoolExecutor executor; 164 165 private final boolean replicationForBulkLoadDataEnabled; 166 167 private AtomicLong totalBufferUsed = new AtomicLong(); 168 // Total buffer size on this RegionServer for holding batched edits to be shipped. 169 private final long totalBufferLimit; 170 private final MetricsReplicationGlobalSourceSource globalMetrics; 171 172 /** 173 * A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains 174 * empty. If an hbase:meta Region is opened on this server, we will create an instance of a 175 * hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we 176 * will not shut it down even if the hbase:meta moves away from this server (in case it later gets 177 * moved back). We synchronize on this instance testing for presence and if absent, while creating 178 * so only created and started once. 179 */ 180 AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>(); 181 182 /** 183 * Creates a replication manager and sets the watch on all the other registered region servers 184 * @param queueStorage the interface for manipulating replication queues 185 * @param conf the configuration to use 186 * @param server the server for this region server 187 * @param fs the file system to use 188 * @param logDir the directory that contains all wal directories of live RSs 189 * @param oldLogDir the directory where old logs are archived 190 */ 191 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 192 ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs, 193 Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory, 194 MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { 195 // CopyOnWriteArrayList is thread-safe. 196 // Generally, reading is more than modifying. 197 this.sources = new ConcurrentHashMap<>(); 198 this.queueStorage = queueStorage; 199 this.replicationPeers = replicationPeers; 200 this.server = server; 201 this.walsById = new ConcurrentHashMap<>(); 202 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 203 this.oldsources = new ArrayList<>(); 204 this.conf = conf; 205 this.fs = fs; 206 this.logDir = logDir; 207 this.oldLogDir = oldLogDir; 208 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 209 // seconds 210 this.clusterId = clusterId; 211 this.walFactory = walFactory; 212 // It's preferable to failover 1 RS at a time, but with good zk servers 213 // more could be processed at the same time. 214 int nbWorkers = conf.getInt("replication.executor.workers", 1); 215 // use a short 100ms sleep since this could be done inline with a RS startup 216 // even if we fail, other region servers can take care of it 217 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, 218 new LinkedBlockingQueue<>()); 219 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 220 tfb.setNameFormat("ReplicationExecutor-%d"); 221 tfb.setDaemon(true); 222 this.executor.setThreadFactory(tfb.build()); 223 this.latestPaths = new HashMap<>(); 224 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 225 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 226 this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 227 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 228 this.globalMetrics = globalMetrics; 229 } 230 231 /** 232 * Adds a normal source per registered peer cluster. 233 */ 234 void init() throws IOException { 235 for (String id : this.replicationPeers.getAllPeerIds()) { 236 addSource(id); 237 if (replicationForBulkLoadDataEnabled) { 238 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 239 // when a peer was added before replication for bulk loaded data was enabled. 240 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); 241 } 242 } 243 } 244 245 /** 246 * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add 247 * HFile Refs 248 * @param peerId the id of replication peer 249 */ 250 public void addPeer(String peerId) throws IOException { 251 boolean added = false; 252 try { 253 added = this.replicationPeers.addPeer(peerId); 254 } catch (ReplicationException e) { 255 throw new IOException(e); 256 } 257 if (added) { 258 addSource(peerId); 259 if (replicationForBulkLoadDataEnabled) { 260 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); 261 } 262 } 263 } 264 265 /** 266 * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id 267 * and related replication queues 3. Remove the normal source and related replication queue 4. 268 * Remove HFile Refs 269 * @param peerId the id of the replication peer 270 */ 271 public void removePeer(String peerId) { 272 replicationPeers.removePeer(peerId); 273 String terminateMessage = "Replication stream was removed by a user"; 274 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 275 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 276 // see NodeFailoverWorker.run 277 synchronized (this.oldsources) { 278 // First close all the recovered sources for this peer 279 for (ReplicationSourceInterface src : oldsources) { 280 if (peerId.equals(src.getPeerId())) { 281 oldSourcesToDelete.add(src); 282 } 283 } 284 for (ReplicationSourceInterface src : oldSourcesToDelete) { 285 src.terminate(terminateMessage); 286 removeRecoveredSource(src); 287 } 288 } 289 LOG 290 .info("Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); 291 // Now close the normal source for this peer 292 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 293 if (srcToRemove != null) { 294 srcToRemove.terminate(terminateMessage); 295 removeSource(srcToRemove); 296 } else { 297 // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup 298 // Delete queue from storage and memory and queue id is same with peer id for normal 299 // source 300 deleteQueue(peerId); 301 this.walsById.remove(peerId); 302 } 303 304 // Remove HFile Refs 305 abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); 306 } 307 308 /** 309 * @return a new 'classic' user-space replication source. 310 * @param queueId the id of the replication queue to associate the ReplicationSource with. 311 * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta. 312 */ 313 private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) 314 throws IOException { 315 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); 316 // Init the just created replication source. Pass the default walProvider's wal file length 317 // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica 318 // replication, see #createCatalogReplicationSource(). 319 WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null 320 ? this.walFactory.getWALProvider().getWALFileLengthProvider() 321 : p -> OptionalLong.empty(); 322 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, 323 walFileLengthProvider, new MetricsSource(queueId)); 324 return src; 325 } 326 327 /** 328 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 329 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 330 * group and do replication 331 * @param peerId the id of the replication peer 332 * @return the source that was created 333 */ 334 ReplicationSourceInterface addSource(String peerId) throws IOException { 335 ReplicationPeer peer = replicationPeers.getPeer(peerId); 336 ReplicationSourceInterface src = createSource(peerId, peer); 337 // synchronized on latestPaths to avoid missing the new log 338 synchronized (this.latestPaths) { 339 this.sources.put(peerId, src); 340 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 341 this.walsById.put(peerId, walsByGroup); 342 // Add the latest wal to that source's queue 343 if (!latestPaths.isEmpty()) { 344 for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) { 345 Path walPath = walPrefixAndPath.getValue(); 346 NavigableSet<String> wals = new TreeSet<>(); 347 wals.add(walPath.getName()); 348 walsByGroup.put(walPrefixAndPath.getKey(), wals); 349 // Abort RS and throw exception to make add peer failed 350 abortAndThrowIOExceptionWhenFail( 351 () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); 352 src.enqueueLog(walPath); 353 LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); 354 } 355 } 356 } 357 src.startup(); 358 return src; 359 } 360 361 /** 362 * Close the previous replication sources of this peer id and open new sources to trigger the new 363 * replication state changes or new replication config changes. Here we don't need to change 364 * replication queue storage and only to enqueue all logs to the new replication source 365 * @param peerId the id of the replication peer n 366 */ 367 public void refreshSources(String peerId) throws IOException { 368 String terminateMessage = "Peer " + peerId 369 + " state or config changed. Will close the previous replication source and open a new one"; 370 ReplicationPeer peer = replicationPeers.getPeer(peerId); 371 ReplicationSourceInterface src = createSource(peerId, peer); 372 // synchronized on latestPaths to avoid missing the new log 373 synchronized (this.latestPaths) { 374 ReplicationSourceInterface toRemove = this.sources.put(peerId, src); 375 if (toRemove != null) { 376 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 377 // Do not clear metrics 378 toRemove.terminate(terminateMessage, null, false); 379 } 380 for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { 381 walsByGroup.forEach(wal -> { 382 Path walPath = new Path(this.logDir, wal); 383 src.enqueueLog(walPath); 384 LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); 385 }); 386 387 } 388 } 389 LOG.info("Startup replication source for " + src.getPeerId()); 390 src.startup(); 391 392 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 393 // synchronized on oldsources to avoid race with NodeFailoverWorker 394 synchronized (this.oldsources) { 395 List<String> previousQueueIds = new ArrayList<>(); 396 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 397 .hasNext();) { 398 ReplicationSourceInterface oldSource = iter.next(); 399 if (oldSource.getPeerId().equals(peerId)) { 400 previousQueueIds.add(oldSource.getQueueId()); 401 oldSource.terminate(terminateMessage); 402 iter.remove(); 403 } 404 } 405 for (String queueId : previousQueueIds) { 406 ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer); 407 this.oldsources.add(recoveredReplicationSource); 408 for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { 409 walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); 410 } 411 toStartup.add(recoveredReplicationSource); 412 } 413 } 414 for (ReplicationSourceInterface replicationSource : toStartup) { 415 replicationSource.startup(); 416 } 417 } 418 419 /** 420 * Clear the metrics and related replication queue of the specified old source 421 * @param src source to clear 422 */ 423 void removeRecoveredSource(ReplicationSourceInterface src) { 424 LOG.info("Done with the recovered queue " + src.getQueueId()); 425 this.oldsources.remove(src); 426 // Delete queue from storage and memory 427 deleteQueue(src.getQueueId()); 428 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 429 } 430 431 /** 432 * Clear the metrics and related replication queue of the specified old source 433 * @param src source to clear 434 */ 435 void removeSource(ReplicationSourceInterface src) { 436 LOG.info("Done with the queue " + src.getQueueId()); 437 this.sources.remove(src.getPeerId()); 438 // Delete queue from storage and memory 439 deleteQueue(src.getQueueId()); 440 this.walsById.remove(src.getQueueId()); 441 } 442 443 /** 444 * Delete a complete queue of wals associated with a replication source 445 * @param queueId the id of replication queue to delete 446 */ 447 private void deleteQueue(String queueId) { 448 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); 449 } 450 451 @FunctionalInterface 452 private interface ReplicationQueueOperation { 453 void exec() throws ReplicationException; 454 } 455 456 /** 457 * Refresh replication source will terminate the old source first, then the source thread will be 458 * interrupted. Need to handle it instead of abort the region server. 459 */ 460 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 461 try { 462 op.exec(); 463 } catch (ReplicationException e) { 464 if ( 465 e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 466 && e.getCause().getCause() != null 467 && e.getCause().getCause() instanceof InterruptedException 468 ) { 469 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 470 // that thread is interrupted deep down in the stack, it should pass the following 471 // processing logic and propagate to the most top layer which can handle this exception 472 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 473 throw new ReplicationRuntimeException( 474 "Thread is interrupted, the replication source may be terminated", 475 e.getCause().getCause()); 476 } 477 server.abort("Failed to operate on replication queue", e); 478 } 479 } 480 481 private void abortWhenFail(ReplicationQueueOperation op) { 482 try { 483 op.exec(); 484 } catch (ReplicationException e) { 485 server.abort("Failed to operate on replication queue", e); 486 } 487 } 488 489 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 490 try { 491 op.exec(); 492 } catch (ReplicationException e) { 493 throw new IOException(e); 494 } 495 } 496 497 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 498 try { 499 op.exec(); 500 } catch (ReplicationException e) { 501 server.abort("Failed to operate on replication queue", e); 502 throw new IOException(e); 503 } 504 } 505 506 /** 507 * This method will log the current position to storage. And also clean old logs from the 508 * replication queue. 509 * @param entryBatch the wal entry batch we just shipped 510 */ 511 public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, 512 WALEntryBatch entryBatch) { 513 String fileName = entryBatch.getLastWalPath().getName(); 514 String queueId = source.getQueueId(); 515 interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, 516 fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); 517 cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered()); 518 } 519 520 /** 521 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 522 * file is closed and has no more entries. 523 * @param log Path to the log 524 * @param inclusive whether we should also remove the given log file 525 * @param queueId id of the replication queue 526 * @param queueRecovered Whether this is a recovered queue 527 */ 528 void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { 529 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 530 if (queueRecovered) { 531 NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); 532 if (wals != null) { 533 cleanOldLogs(wals, log, inclusive, queueId); 534 } 535 } else { 536 // synchronized on walsById to avoid race with preLogRoll 537 synchronized (this.walsById) { 538 NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); 539 if (wals != null) { 540 cleanOldLogs(wals, log, inclusive, queueId); 541 } 542 } 543 } 544 } 545 546 private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { 547 NavigableSet<String> walSet = wals.headSet(key, inclusive); 548 if (walSet.isEmpty()) { 549 return; 550 } 551 LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); 552 for (String wal : walSet) { 553 interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); 554 } 555 walSet.clear(); 556 } 557 558 // public because of we call it in TestReplicationEmptyWALRecovery 559 public void preLogRoll(Path newLog) throws IOException { 560 String logName = newLog.getName(); 561 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 562 // synchronized on latestPaths to avoid the new open source miss the new log 563 synchronized (this.latestPaths) { 564 // Add log to queue storage 565 for (ReplicationSourceInterface source : this.sources.values()) { 566 // If record log to queue storage failed, abort RS and throw exception to make log roll 567 // failed 568 abortAndThrowIOExceptionWhenFail( 569 () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); 570 } 571 572 // synchronized on walsById to avoid race with cleanOldLogs 573 synchronized (this.walsById) { 574 // Update walsById map 575 for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById 576 .entrySet()) { 577 String peerId = entry.getKey(); 578 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 579 boolean existingPrefix = false; 580 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 581 SortedSet<String> wals = walsEntry.getValue(); 582 if (this.sources.isEmpty()) { 583 // If there's no slaves, don't need to keep the old wals since 584 // we only consider the last one when a new slave comes in 585 wals.clear(); 586 } 587 if (logPrefix.equals(walsEntry.getKey())) { 588 wals.add(logName); 589 existingPrefix = true; 590 } 591 } 592 if (!existingPrefix) { 593 // The new log belongs to a new group, add it into this peer 594 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 595 NavigableSet<String> wals = new TreeSet<>(); 596 wals.add(logName); 597 walsByPrefix.put(logPrefix, wals); 598 } 599 } 600 } 601 602 // Add to latestPaths 603 latestPaths.put(logPrefix, newLog); 604 } 605 } 606 607 // public because of we call it in TestReplicationEmptyWALRecovery 608 public void postLogRoll(Path newLog) throws IOException { 609 // This only updates the sources we own, not the recovered ones 610 for (ReplicationSourceInterface source : this.sources.values()) { 611 source.enqueueLog(newLog); 612 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog, 613 source.getQueueId()); 614 } 615 } 616 617 void claimQueue(ServerName deadRS, String queue) { 618 // Wait a bit before transferring the queues, we may be shutting down. 619 // This sleep may not be enough in some cases. 620 try { 621 Thread.sleep(sleepBeforeFailover 622 + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 623 } catch (InterruptedException e) { 624 LOG.warn("Interrupted while waiting before transferring a queue."); 625 Thread.currentThread().interrupt(); 626 } 627 // We try to lock that rs' queue directory 628 if (server.isStopped()) { 629 LOG.info("Not transferring queue since we are shutting down"); 630 return; 631 } 632 // After claim the queues from dead region server, wewill skip to start the 633 // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a 634 // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get 635 // a copy of the replication peer first to decide whether we should start the 636 // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to 637 // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). 638 String peerId = new ReplicationQueueInfo(queue).getPeerId(); 639 ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); 640 if (oldPeer == null) { 641 LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", 642 peerId, queue); 643 return; 644 } 645 Pair<String, SortedSet<String>> claimedQueue; 646 try { 647 claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName()); 648 } catch (ReplicationException e) { 649 LOG.error( 650 "ReplicationException: cannot claim dead region ({})'s " + "replication queue. Znode : ({})" 651 + " Possible solution: check if znode size exceeds jute.maxBuffer value. " 652 + " If so, increase it for both client and server side.", 653 deadRS, queueStorage.getRsNode(deadRS), e); 654 server.abort("Failed to claim queue from dead regionserver.", e); 655 return; 656 } 657 if (claimedQueue.getSecond().isEmpty()) { 658 return; 659 } 660 String queueId = claimedQueue.getFirst(); 661 Set<String> walsSet = claimedQueue.getSecond(); 662 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 663 if (peer == null || peer != oldPeer) { 664 LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS); 665 abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); 666 return; 667 } 668 if ( 669 server instanceof ReplicationSyncUp.DummyServer 670 && peer.getPeerState().equals(PeerState.DISABLED) 671 ) { 672 LOG.warn( 673 "Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.", 674 peerId); 675 return; 676 } 677 678 ReplicationSourceInterface src; 679 try { 680 src = createSource(queueId, peer); 681 } catch (IOException e) { 682 LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e); 683 server.abort("Failed to create replication source after claiming queue.", e); 684 return; 685 } 686 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 687 synchronized (oldsources) { 688 peer = replicationPeers.getPeer(src.getPeerId()); 689 if (peer == null || peer != oldPeer) { 690 src.terminate("Recovered queue doesn't belong to any current peer"); 691 deleteQueue(queueId); 692 return; 693 } 694 // track sources in walsByIdRecoveredQueues 695 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 696 walsByIdRecoveredQueues.put(queueId, walsByGroup); 697 for (String wal : walsSet) { 698 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 699 NavigableSet<String> wals = walsByGroup.get(walPrefix); 700 if (wals == null) { 701 wals = new TreeSet<>(); 702 walsByGroup.put(walPrefix, wals); 703 } 704 wals.add(wal); 705 } 706 oldsources.add(src); 707 LOG.info("Added source for recovered queue {}", src.getQueueId()); 708 for (String wal : walsSet) { 709 LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); 710 src.enqueueLog(new Path(oldLogDir, wal)); 711 } 712 src.startup(); 713 } 714 } 715 716 /** 717 * Terminate the replication on this region server 718 */ 719 public void join() { 720 this.executor.shutdown(); 721 for (ReplicationSourceInterface source : this.sources.values()) { 722 source.terminate("Region server is closing"); 723 } 724 for (ReplicationSourceInterface source : this.oldsources) { 725 source.terminate("Region server is closing"); 726 } 727 } 728 729 /** 730 * Get a copy of the wals of the normal sources on this rs 731 * @return a sorted set of wal names 732 */ 733 public Map<String, Map<String, NavigableSet<String>>> getWALs() { 734 return Collections.unmodifiableMap(walsById); 735 } 736 737 /** 738 * Get a copy of the wals of the recovered sources on this rs 739 * @return a sorted set of wal names 740 */ 741 Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { 742 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 743 } 744 745 /** 746 * Get a list of all the normal sources of this rs 747 * @return list of all normal sources 748 */ 749 public List<ReplicationSourceInterface> getSources() { 750 return new ArrayList<>(this.sources.values()); 751 } 752 753 /** 754 * Get a list of all the recovered sources of this rs 755 * @return list of all recovered sources 756 */ 757 public List<ReplicationSourceInterface> getOldSources() { 758 return this.oldsources; 759 } 760 761 /** 762 * Get the normal source for a given peer 763 * @return the normal source for the give peer if it exists, otherwise null. 764 */ 765 public ReplicationSourceInterface getSource(String peerId) { 766 return this.sources.get(peerId); 767 } 768 769 List<String> getAllQueues() throws IOException { 770 List<String> allQueues = Collections.emptyList(); 771 try { 772 allQueues = queueStorage.getAllQueues(server.getServerName()); 773 } catch (ReplicationException e) { 774 throw new IOException(e); 775 } 776 return allQueues; 777 } 778 779 int getSizeOfLatestPath() { 780 synchronized (latestPaths) { 781 return latestPaths.size(); 782 } 783 } 784 785 Set<Path> getLastestPath() { 786 synchronized (latestPaths) { 787 return Sets.newHashSet(latestPaths.values()); 788 } 789 } 790 791 public AtomicLong getTotalBufferUsed() { 792 return totalBufferUsed; 793 } 794 795 /** 796 * Returns the maximum size in bytes of edits held in memory which are pending replication across 797 * all sources inside this RegionServer. 798 */ 799 public long getTotalBufferLimit() { 800 return totalBufferLimit; 801 } 802 803 /** 804 * Get the directory where wals are archived 805 * @return the directory where wals are archived 806 */ 807 public Path getOldLogDir() { 808 return this.oldLogDir; 809 } 810 811 /** 812 * Get the directory where wals are stored by their RSs 813 * @return the directory where wals are stored by their RSs 814 */ 815 public Path getLogDir() { 816 return this.logDir; 817 } 818 819 /** 820 * Get the handle on the local file system 821 * @return Handle on the local file system 822 */ 823 public FileSystem getFs() { 824 return this.fs; 825 } 826 827 /** 828 * Get the ReplicationPeers used by this ReplicationSourceManager 829 * @return the ReplicationPeers used by this ReplicationSourceManager 830 */ 831 public ReplicationPeers getReplicationPeers() { 832 return this.replicationPeers; 833 } 834 835 /** 836 * Get a string representation of all the sources' metrics 837 */ 838 public String getStats() { 839 StringBuilder stats = new StringBuilder(); 840 // Print stats that apply across all Replication Sources 841 stats.append("Global stats: "); 842 stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") 843 .append(getTotalBufferLimit()).append("B\n"); 844 for (ReplicationSourceInterface source : this.sources.values()) { 845 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 846 stats.append(source.getStats() + "\n"); 847 } 848 for (ReplicationSourceInterface oldSource : oldsources) { 849 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 850 stats.append(oldSource.getStats() + "\n"); 851 } 852 return stats.toString(); 853 } 854 855 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 856 throws IOException { 857 for (ReplicationSourceInterface source : this.sources.values()) { 858 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 859 } 860 } 861 862 public void cleanUpHFileRefs(String peerId, List<String> files) { 863 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 864 } 865 866 int activeFailoverTaskCount() { 867 return executor.getActiveCount(); 868 } 869 870 MetricsReplicationGlobalSourceSource getGlobalMetrics() { 871 return this.globalMetrics; 872 } 873 874 /** 875 * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it 876 * once only. If exists already, use the existing one. 877 * @see #removeCatalogReplicationSource(RegionInfo) 878 * @see #addSource(String) This is specialization on the addSource method. 879 */ 880 public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo) 881 throws IOException { 882 // Poor-man's putIfAbsent 883 synchronized (this.catalogReplicationSource) { 884 ReplicationSourceInterface rs = this.catalogReplicationSource.get(); 885 return rs != null 886 ? rs 887 : this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo)); 888 } 889 } 890 891 /** 892 * Remove the hbase:meta Catalog replication source. Called when we close hbase:meta. 893 * @see #addCatalogReplicationSource(RegionInfo regionInfo) 894 */ 895 public void removeCatalogReplicationSource(RegionInfo regionInfo) { 896 // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region 897 // comes back to this server. 898 } 899 900 /** 901 * Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only 902 * (caller must ensure one-time only call). This ReplicationSource is NOT created via 903 * {@link ReplicationSourceFactory}. 904 * @see #addSource(String) This is a specialization of the addSource call. 905 * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on 906 * why the special handling). 907 */ 908 private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo) 909 throws IOException { 910 // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the 911 // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider. 912 WALProvider walProvider = this.walFactory.getMetaWALProvider(); 913 boolean instantiate = walProvider == null; 914 if (instantiate) { 915 walProvider = this.walFactory.getMetaProvider(); 916 } 917 // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need 918 // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog 919 // read replicas feature that makes use of the source does a reset on a crash of the WAL 920 // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the 921 // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail. 922 CatalogReplicationSourcePeer peer = 923 new CatalogReplicationSourcePeer(this.conf, this.clusterId.toString()); 924 final ReplicationSourceInterface crs = new CatalogReplicationSource(); 925 crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), 926 clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); 927 // Add listener on the provider so we can pick up the WAL to replicate on roll. 928 WALActionsListener listener = new WALActionsListener() { 929 @Override 930 public void postLogRoll(Path oldPath, Path newPath) throws IOException { 931 crs.enqueueLog(newPath); 932 } 933 }; 934 walProvider.addWALActionsListener(listener); 935 if (!instantiate) { 936 // If we did not instantiate provider, need to add our listener on already-created WAL 937 // instance too (listeners are passed by provider to WAL instance on creation but if provider 938 // created already, our listener add above is missed). And add the current WAL file to the 939 // Replication Source so it can start replicating it. 940 WAL wal = walProvider.getWAL(regionInfo); 941 wal.registerWALActionsListener(listener); 942 crs.enqueueLog(((AbstractFSWAL) wal).getCurrentFileName()); 943 } 944 return crs.startup(); 945 } 946 947 ReplicationQueueStorage getQueueStorage() { 948 return queueStorage; 949 } 950}