001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableSet; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.TreeSet; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.Future; 036import java.util.concurrent.LinkedBlockingQueue; 037import java.util.concurrent.RejectedExecutionException; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.Server; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationListener; 053import org.apache.hadoop.hbase.replication.ReplicationPeer; 054import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 055import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 056import org.apache.hadoop.hbase.replication.ReplicationPeers; 057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.ReplicationTracker; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.apache.zookeeper.KeeperException; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069 070/** 071 * This class is responsible to manage all the replication sources. There are two classes of 072 * sources: 073 * <ul> 074 * <li>Normal sources are persistent and one per peer cluster</li> 075 * <li>Old sources are recovered from a failed region server and our only goal is to finish 076 * replicating the WAL queue it had</li> 077 * </ul> 078 * <p> 079 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 080 * in order to transfer all the queues in a local old source. 081 * <p> 082 * Synchronization specification: 083 * <ul> 084 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 085 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 086 * operations.</li> 087 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 088 * {@link #addPeer(String)}, {@link #removePeer(String)}, 089 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. 090 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in 091 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 092 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is 093 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 094 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 095 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 096 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 097 * {@link #preLogRoll(Path)}.</li> 098 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 099 * modify it, {@link #removePeer(String)} , 100 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 101 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. 102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by 103 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 104 * {@link ReplicationSourceInterface} firstly, then remove the wals from 105 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} 106 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a 107 * {@link ReplicationSourceInterface}. So there is no race here. For 108 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there 109 * is already synchronized on {@link #oldsources}. So no need synchronized on 110 * {@link #walsByIdRecoveredQueues}.</li> 111 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 112 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 113 * to-be-removed peer.</li> 114 * </ul> 115 */ 116@InterfaceAudience.Private 117public class ReplicationSourceManager implements ReplicationListener { 118 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 119 // all the sources that read this RS's logs and every peer only has one replication source 120 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 121 // List of all the sources we got from died RSs 122 private final List<ReplicationSourceInterface> oldsources; 123 private final ReplicationQueueStorage queueStorage; 124 private final ReplicationTracker replicationTracker; 125 private final ReplicationPeers replicationPeers; 126 // UUID for this cluster 127 private final UUID clusterId; 128 // All about stopping 129 private final Server server; 130 131 // All logs we are currently tracking 132 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 133 // For normal replication source, the peer id is same with the queue id 134 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; 135 // Logs for recovered sources we are currently tracking 136 // the map is: queue_id->logPrefix/logGroup->logs 137 // For recovered source, the queue id's format is peer_id-servername-* 138 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; 139 140 private final Configuration conf; 141 private final FileSystem fs; 142 // The paths to the latest log of each wal group, for new coming peers 143 private final Set<Path> latestPaths; 144 // Path to the wals directories 145 private final Path logDir; 146 // Path to the wal archive 147 private final Path oldLogDir; 148 private final WALFileLengthProvider walFileLengthProvider; 149 // The number of ms that we wait before moving znodes, HBASE-3596 150 private final long sleepBeforeFailover; 151 // Homemade executer service for replication 152 private final ThreadPoolExecutor executor; 153 154 private final boolean replicationForBulkLoadDataEnabled; 155 156 157 private AtomicLong totalBufferUsed = new AtomicLong(); 158 159 /** 160 * Creates a replication manager and sets the watch on all the other registered region servers 161 * @param queueStorage the interface for manipulating replication queues 162 * @param replicationPeers 163 * @param replicationTracker 164 * @param conf the configuration to use 165 * @param server the server for this region server 166 * @param fs the file system to use 167 * @param logDir the directory that contains all wal directories of live RSs 168 * @param oldLogDir the directory where old logs are archived 169 * @param clusterId 170 */ 171 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 172 ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, 173 Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, 174 WALFileLengthProvider walFileLengthProvider) throws IOException { 175 // CopyOnWriteArrayList is thread-safe. 176 // Generally, reading is more than modifying. 177 this.sources = new ConcurrentHashMap<>(); 178 this.queueStorage = queueStorage; 179 this.replicationPeers = replicationPeers; 180 this.replicationTracker = replicationTracker; 181 this.server = server; 182 this.walsById = new ConcurrentHashMap<>(); 183 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 184 this.oldsources = new ArrayList<>(); 185 this.conf = conf; 186 this.fs = fs; 187 this.logDir = logDir; 188 this.oldLogDir = oldLogDir; 189 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 190 // seconds 191 this.clusterId = clusterId; 192 this.walFileLengthProvider = walFileLengthProvider; 193 this.replicationTracker.registerListener(this); 194 // It's preferable to failover 1 RS at a time, but with good zk servers 195 // more could be processed at the same time. 196 int nbWorkers = conf.getInt("replication.executor.workers", 1); 197 // use a short 100ms sleep since this could be done inline with a RS startup 198 // even if we fail, other region servers can take care of it 199 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, 200 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 201 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 202 tfb.setNameFormat("ReplicationExecutor-%d"); 203 tfb.setDaemon(true); 204 this.executor.setThreadFactory(tfb.build()); 205 this.latestPaths = new HashSet<Path>(); 206 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 207 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 208 } 209 210 /** 211 * Adds a normal source per registered peer cluster and tries to process all old region server wal 212 * queues 213 * <p> 214 * The returned future is for adoptAbandonedQueues task. 215 */ 216 Future<?> init() throws IOException { 217 for (String id : this.replicationPeers.getAllPeerIds()) { 218 addSource(id); 219 if (replicationForBulkLoadDataEnabled) { 220 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 221 // when a peer was added before replication for bulk loaded data was enabled. 222 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); 223 } 224 } 225 return this.executor.submit(this::adoptAbandonedQueues); 226 } 227 228 private void adoptAbandonedQueues() { 229 List<ServerName> currentReplicators = null; 230 try { 231 currentReplicators = queueStorage.getListOfReplicators(); 232 } catch (ReplicationException e) { 233 server.abort("Failed to get all replicators", e); 234 return; 235 } 236 if (currentReplicators == null || currentReplicators.isEmpty()) { 237 return; 238 } 239 List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() 240 .map(ServerName::valueOf).collect(Collectors.toList()); 241 LOG.info( 242 "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); 243 244 // Look if there's anything to process after a restart 245 for (ServerName rs : currentReplicators) { 246 if (!otherRegionServers.contains(rs)) { 247 transferQueues(rs); 248 } 249 } 250 } 251 252 /** 253 * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add 254 * HFile Refs 255 * @param peerId the id of replication peer 256 */ 257 public void addPeer(String peerId) throws IOException { 258 boolean added = false; 259 try { 260 added = this.replicationPeers.addPeer(peerId); 261 } catch (ReplicationException e) { 262 throw new IOException(e); 263 } 264 if (added) { 265 addSource(peerId); 266 if (replicationForBulkLoadDataEnabled) { 267 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); 268 } 269 } 270 } 271 272 /** 273 * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id 274 * and related replication queues 3. Remove the normal source and related replication queue 4. 275 * Remove HFile Refs 276 * @param peerId the id of the replication peer 277 */ 278 public void removePeer(String peerId) { 279 replicationPeers.removePeer(peerId); 280 String terminateMessage = "Replication stream was removed by a user"; 281 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 282 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 283 // see NodeFailoverWorker.run 284 synchronized (this.oldsources) { 285 // First close all the recovered sources for this peer 286 for (ReplicationSourceInterface src : oldsources) { 287 if (peerId.equals(src.getPeerId())) { 288 oldSourcesToDelete.add(src); 289 } 290 } 291 for (ReplicationSourceInterface src : oldSourcesToDelete) { 292 src.terminate(terminateMessage); 293 removeRecoveredSource(src); 294 } 295 } 296 LOG.info( 297 "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); 298 // Now close the normal source for this peer 299 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 300 if (srcToRemove != null) { 301 srcToRemove.terminate(terminateMessage); 302 removeSource(srcToRemove); 303 } else { 304 // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup 305 // Delete queue from storage and memory and queue id is same with peer id for normal 306 // source 307 deleteQueue(peerId); 308 this.walsById.remove(peerId); 309 } 310 311 // Remove HFile Refs 312 abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); 313 } 314 315 /** 316 * Factory method to create a replication source 317 * @param queueId the id of the replication queue 318 * @return the created source 319 */ 320 private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) 321 throws IOException { 322 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); 323 324 MetricsSource metrics = new MetricsSource(queueId); 325 // init replication source 326 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, 327 walFileLengthProvider, metrics); 328 return src; 329 } 330 331 /** 332 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 333 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 334 * group and do replication 335 * @param peerId the id of the replication peer 336 * @return the source that was created 337 */ 338 @VisibleForTesting 339 ReplicationSourceInterface addSource(String peerId) throws IOException { 340 ReplicationPeer peer = replicationPeers.getPeer(peerId); 341 ReplicationSourceInterface src = createSource(peerId, peer); 342 // synchronized on latestPaths to avoid missing the new log 343 synchronized (this.latestPaths) { 344 this.sources.put(peerId, src); 345 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 346 this.walsById.put(peerId, walsByGroup); 347 // Add the latest wal to that source's queue 348 if (this.latestPaths.size() > 0) { 349 for (Path logPath : latestPaths) { 350 String name = logPath.getName(); 351 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); 352 NavigableSet<String> logs = new TreeSet<>(); 353 logs.add(name); 354 walsByGroup.put(walPrefix, logs); 355 // Abort RS and throw exception to make add peer failed 356 abortAndThrowIOExceptionWhenFail( 357 () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); 358 src.enqueueLog(logPath); 359 } 360 } 361 } 362 src.startup(); 363 return src; 364 } 365 366 /** 367 * Close the previous replication sources of this peer id and open new sources to trigger the new 368 * replication state changes or new replication config changes. Here we don't need to change 369 * replication queue storage and only to enqueue all logs to the new replication source 370 * @param peerId the id of the replication peer 371 * @throws IOException 372 */ 373 public void refreshSources(String peerId) throws IOException { 374 String terminateMessage = "Peer " + peerId + 375 " state or config changed. Will close the previous replication source and open a new one"; 376 ReplicationPeer peer = replicationPeers.getPeer(peerId); 377 ReplicationSourceInterface src = createSource(peerId, peer); 378 // synchronized on latestPaths to avoid missing the new log 379 synchronized (this.latestPaths) { 380 ReplicationSourceInterface toRemove = this.sources.put(peerId, src); 381 if (toRemove != null) { 382 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 383 // Do not clear metrics 384 toRemove.terminate(terminateMessage, null, false); 385 } 386 for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { 387 walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); 388 } 389 } 390 LOG.info("Startup replication source for " + src.getPeerId()); 391 src.startup(); 392 393 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 394 // synchronized on oldsources to avoid race with NodeFailoverWorker 395 synchronized (this.oldsources) { 396 List<String> previousQueueIds = new ArrayList<>(); 397 for (ReplicationSourceInterface oldSource : this.oldsources) { 398 if (oldSource.getPeerId().equals(peerId)) { 399 previousQueueIds.add(oldSource.getQueueId()); 400 oldSource.terminate(terminateMessage); 401 this.oldsources.remove(oldSource); 402 } 403 } 404 for (String queueId : previousQueueIds) { 405 ReplicationSourceInterface replicationSource = createSource(queueId, peer); 406 this.oldsources.add(replicationSource); 407 for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { 408 walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); 409 } 410 toStartup.add(replicationSource); 411 } 412 } 413 for (ReplicationSourceInterface replicationSource : toStartup) { 414 replicationSource.startup(); 415 } 416 } 417 418 /** 419 * Clear the metrics and related replication queue of the specified old source 420 * @param src source to clear 421 */ 422 void removeRecoveredSource(ReplicationSourceInterface src) { 423 LOG.info("Done with the recovered queue " + src.getQueueId()); 424 this.oldsources.remove(src); 425 // Delete queue from storage and memory 426 deleteQueue(src.getQueueId()); 427 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 428 } 429 430 /** 431 * Clear the metrics and related replication queue of the specified old source 432 * @param src source to clear 433 */ 434 void removeSource(ReplicationSourceInterface src) { 435 LOG.info("Done with the queue " + src.getQueueId()); 436 this.sources.remove(src.getPeerId()); 437 // Delete queue from storage and memory 438 deleteQueue(src.getQueueId()); 439 this.walsById.remove(src.getQueueId()); 440 } 441 442 /** 443 * Delete a complete queue of wals associated with a replication source 444 * @param queueId the id of replication queue to delete 445 */ 446 private void deleteQueue(String queueId) { 447 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); 448 } 449 450 @FunctionalInterface 451 private interface ReplicationQueueOperation { 452 void exec() throws ReplicationException; 453 } 454 455 /** 456 * Refresh replication source will terminate the old source first, then the source thread will be 457 * interrupted. Need to handle it instead of abort the region server. 458 */ 459 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 460 try { 461 op.exec(); 462 } catch (ReplicationException e) { 463 if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 464 && e.getCause().getCause() != null && e.getCause() 465 .getCause() instanceof InterruptedException) { 466 throw new RuntimeException( 467 "Thread is interrupted, the replication source may be terminated"); 468 } 469 server.abort("Failed to operate on replication queue", e); 470 } 471 } 472 473 private void abortWhenFail(ReplicationQueueOperation op) { 474 try { 475 op.exec(); 476 } catch (ReplicationException e) { 477 server.abort("Failed to operate on replication queue", e); 478 } 479 } 480 481 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 482 try { 483 op.exec(); 484 } catch (ReplicationException e) { 485 throw new IOException(e); 486 } 487 } 488 489 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 490 try { 491 op.exec(); 492 } catch (ReplicationException e) { 493 server.abort("Failed to operate on replication queue", e); 494 throw new IOException(e); 495 } 496 } 497 498 /** 499 * This method will log the current position to storage. And also clean old logs from the 500 * replication queue. 501 * @param queueId id of the replication queue 502 * @param queueRecovered indicates if this queue comes from another region server 503 * @param entryBatch the wal entry batch we just shipped 504 */ 505 public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, 506 WALEntryBatch entryBatch) { 507 String fileName = entryBatch.getLastWalPath().getName(); 508 interruptOrAbortWhenFail(() -> this.queueStorage 509 .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), 510 entryBatch.getLastSeqIds())); 511 cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); 512 } 513 514 /** 515 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 516 * file is closed and has no more entries. 517 * @param log Path to the log 518 * @param inclusive whether we should also remove the given log file 519 * @param queueId id of the replication queue 520 * @param queueRecovered Whether this is a recovered queue 521 */ 522 @VisibleForTesting 523 void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { 524 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 525 if (queueRecovered) { 526 NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); 527 if (wals != null) { 528 cleanOldLogs(wals, log, inclusive, queueId); 529 } 530 } else { 531 // synchronized on walsById to avoid race with preLogRoll 532 synchronized (this.walsById) { 533 NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); 534 if (wals != null) { 535 cleanOldLogs(wals, log, inclusive, queueId); 536 } 537 } 538 } 539 } 540 541 private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { 542 NavigableSet<String> walSet = wals.headSet(key, inclusive); 543 if (walSet.isEmpty()) { 544 return; 545 } 546 LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); 547 for (String wal : walSet) { 548 interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); 549 } 550 walSet.clear(); 551 } 552 553 // public because of we call it in TestReplicationEmptyWALRecovery 554 @VisibleForTesting 555 public void preLogRoll(Path newLog) throws IOException { 556 String logName = newLog.getName(); 557 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 558 // synchronized on latestPaths to avoid the new open source miss the new log 559 synchronized (this.latestPaths) { 560 // Add log to queue storage 561 for (ReplicationSourceInterface source : this.sources.values()) { 562 // If record log to queue storage failed, abort RS and throw exception to make log roll 563 // failed 564 abortAndThrowIOExceptionWhenFail( 565 () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); 566 } 567 568 // synchronized on walsById to avoid race with cleanOldLogs 569 synchronized (this.walsById) { 570 // Update walsById map 571 for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById 572 .entrySet()) { 573 String peerId = entry.getKey(); 574 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 575 boolean existingPrefix = false; 576 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 577 SortedSet<String> wals = walsEntry.getValue(); 578 if (this.sources.isEmpty()) { 579 // If there's no slaves, don't need to keep the old wals since 580 // we only consider the last one when a new slave comes in 581 wals.clear(); 582 } 583 if (logPrefix.equals(walsEntry.getKey())) { 584 wals.add(logName); 585 existingPrefix = true; 586 } 587 } 588 if (!existingPrefix) { 589 // The new log belongs to a new group, add it into this peer 590 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 591 NavigableSet<String> wals = new TreeSet<>(); 592 wals.add(logName); 593 walsByPrefix.put(logPrefix, wals); 594 } 595 } 596 } 597 598 // Add to latestPaths 599 Iterator<Path> iterator = latestPaths.iterator(); 600 while (iterator.hasNext()) { 601 Path path = iterator.next(); 602 if (path.getName().contains(logPrefix)) { 603 iterator.remove(); 604 break; 605 } 606 } 607 this.latestPaths.add(newLog); 608 } 609 } 610 611 // public because of we call it in TestReplicationEmptyWALRecovery 612 @VisibleForTesting 613 public void postLogRoll(Path newLog) throws IOException { 614 // This only updates the sources we own, not the recovered ones 615 for (ReplicationSourceInterface source : this.sources.values()) { 616 source.enqueueLog(newLog); 617 } 618 } 619 620 @Override 621 public void regionServerRemoved(String regionserver) { 622 transferQueues(ServerName.valueOf(regionserver)); 623 } 624 625 /** 626 * Transfer all the queues of the specified to this region server. First it tries to grab a lock 627 * and if it works it will move the old queues and finally will delete the old queues. 628 * <p> 629 * It creates one old source for any type of source of the old rs. 630 */ 631 private void transferQueues(ServerName deadRS) { 632 if (server.getServerName().equals(deadRS)) { 633 // it's just us, give up 634 return; 635 } 636 NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); 637 try { 638 this.executor.execute(transfer); 639 } catch (RejectedExecutionException ex) { 640 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 641 .getGlobalSource().incrFailedRecoveryQueue(); 642 LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); 643 } 644 } 645 646 /** 647 * Class responsible to setup new ReplicationSources to take care of the queues from dead region 648 * servers. 649 */ 650 class NodeFailoverWorker extends Thread { 651 652 private final ServerName deadRS; 653 // After claim the queues from dead region server, the NodeFailoverWorker will skip to start 654 // the RecoveredReplicationSource if the peer has been removed. but there's possible that 655 // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the 656 // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we 657 // should start the RecoveredReplicationSource. If the latest peer is not the old peer when 658 // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise 659 // the rs will abort (See HBASE-20475). 660 private final Map<String, ReplicationPeerImpl> peersSnapshot; 661 662 @VisibleForTesting 663 public NodeFailoverWorker(ServerName deadRS) { 664 super("Failover-for-" + deadRS); 665 this.deadRS = deadRS; 666 peersSnapshot = new HashMap<>(replicationPeers.getPeerCache()); 667 } 668 669 private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) { 670 ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId); 671 return oldPeerRef != null && oldPeerRef == newPeerRef; 672 } 673 674 @Override 675 public void run() { 676 // Wait a bit before transferring the queues, we may be shutting down. 677 // This sleep may not be enough in some cases. 678 try { 679 Thread.sleep(sleepBeforeFailover + 680 (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 681 } catch (InterruptedException e) { 682 LOG.warn("Interrupted while waiting before transferring a queue."); 683 Thread.currentThread().interrupt(); 684 } 685 // We try to lock that rs' queue directory 686 if (server.isStopped()) { 687 LOG.info("Not transferring queue since we are shutting down"); 688 return; 689 } 690 Map<String, Set<String>> newQueues = new HashMap<>(); 691 try { 692 List<String> queues = queueStorage.getAllQueues(deadRS); 693 while (!queues.isEmpty()) { 694 Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS, 695 queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName()); 696 long sleep = sleepBeforeFailover / 2; 697 if (!peer.getSecond().isEmpty()) { 698 newQueues.put(peer.getFirst(), peer.getSecond()); 699 sleep = sleepBeforeFailover; 700 } 701 try { 702 Thread.sleep(sleep); 703 } catch (InterruptedException e) { 704 LOG.warn("Interrupted while waiting before transferring a queue."); 705 Thread.currentThread().interrupt(); 706 } 707 queues = queueStorage.getAllQueues(deadRS); 708 } 709 if (queues.isEmpty()) { 710 queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); 711 } 712 } catch (ReplicationException e) { 713 LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " + 714 "replication queue. Znode : (%s)" + 715 " Possible solution: check if znode size exceeds jute.maxBuffer value. " + 716 " If so, increase it for both client and server side." + e), deadRS, 717 queueStorage.getRsNode(deadRS)); 718 server.abort("Failed to claim queue from dead regionserver.", e); 719 return; 720 } 721 // Copying over the failed queue is completed. 722 if (newQueues.isEmpty()) { 723 // We either didn't get the lock or the failed region server didn't have any outstanding 724 // WALs to replicate, so we are done. 725 return; 726 } 727 728 for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) { 729 String queueId = entry.getKey(); 730 Set<String> walsSet = entry.getValue(); 731 try { 732 // there is not an actual peer defined corresponding to peerId for the failover. 733 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); 734 String actualPeerId = replicationQueueInfo.getPeerId(); 735 736 ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); 737 if (peer == null || !isOldPeer(actualPeerId, peer)) { 738 LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId, 739 deadRS); 740 abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); 741 continue; 742 } 743 if (server instanceof ReplicationSyncUp.DummyServer 744 && peer.getPeerState().equals(PeerState.DISABLED)) { 745 LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip " 746 + "replicating data to this peer.", 747 actualPeerId); 748 continue; 749 } 750 // track sources in walsByIdRecoveredQueues 751 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 752 walsByIdRecoveredQueues.put(queueId, walsByGroup); 753 for (String wal : walsSet) { 754 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 755 NavigableSet<String> wals = walsByGroup.get(walPrefix); 756 if (wals == null) { 757 wals = new TreeSet<>(); 758 walsByGroup.put(walPrefix, wals); 759 } 760 wals.add(wal); 761 } 762 763 ReplicationSourceInterface src = createSource(queueId, peer); 764 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 765 synchronized (oldsources) { 766 peer = replicationPeers.getPeer(src.getPeerId()); 767 if (peer == null || !isOldPeer(src.getPeerId(), peer)) { 768 src.terminate("Recovered queue doesn't belong to any current peer"); 769 removeRecoveredSource(src); 770 continue; 771 } 772 oldsources.add(src); 773 for (String wal : walsSet) { 774 src.enqueueLog(new Path(oldLogDir, wal)); 775 } 776 src.startup(); 777 } 778 } catch (IOException e) { 779 // TODO manage it 780 LOG.error("Failed creating a source", e); 781 } 782 } 783 } 784 } 785 786 /** 787 * Terminate the replication on this region server 788 */ 789 public void join() { 790 this.executor.shutdown(); 791 for (ReplicationSourceInterface source : this.sources.values()) { 792 source.terminate("Region server is closing"); 793 } 794 } 795 796 /** 797 * Get a copy of the wals of the normal sources on this rs 798 * @return a sorted set of wal names 799 */ 800 @VisibleForTesting 801 public Map<String, Map<String, NavigableSet<String>>> getWALs() { 802 return Collections.unmodifiableMap(walsById); 803 } 804 805 /** 806 * Get a copy of the wals of the recovered sources on this rs 807 * @return a sorted set of wal names 808 */ 809 @VisibleForTesting 810 Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { 811 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 812 } 813 814 /** 815 * Get a list of all the normal sources of this rs 816 * @return list of all normal sources 817 */ 818 public List<ReplicationSourceInterface> getSources() { 819 return new ArrayList<>(this.sources.values()); 820 } 821 822 /** 823 * Get a list of all the recovered sources of this rs 824 * @return list of all recovered sources 825 */ 826 public List<ReplicationSourceInterface> getOldSources() { 827 return this.oldsources; 828 } 829 830 /** 831 * Get the normal source for a given peer 832 * @return the normal source for the give peer if it exists, otherwise null. 833 */ 834 @VisibleForTesting 835 public ReplicationSourceInterface getSource(String peerId) { 836 return this.sources.get(peerId); 837 } 838 839 @VisibleForTesting 840 List<String> getAllQueues() throws IOException { 841 List<String> allQueues = Collections.emptyList(); 842 try { 843 allQueues = queueStorage.getAllQueues(server.getServerName()); 844 } catch (ReplicationException e) { 845 throw new IOException(e); 846 } 847 return allQueues; 848 } 849 850 @VisibleForTesting 851 int getSizeOfLatestPath() { 852 synchronized (latestPaths) { 853 return latestPaths.size(); 854 } 855 } 856 857 @VisibleForTesting 858 public AtomicLong getTotalBufferUsed() { 859 return totalBufferUsed; 860 } 861 862 /** 863 * Get the directory where wals are archived 864 * @return the directory where wals are archived 865 */ 866 public Path getOldLogDir() { 867 return this.oldLogDir; 868 } 869 870 /** 871 * Get the directory where wals are stored by their RSs 872 * @return the directory where wals are stored by their RSs 873 */ 874 public Path getLogDir() { 875 return this.logDir; 876 } 877 878 /** 879 * Get the handle on the local file system 880 * @return Handle on the local file system 881 */ 882 public FileSystem getFs() { 883 return this.fs; 884 } 885 886 /** 887 * Get the ReplicationPeers used by this ReplicationSourceManager 888 * @return the ReplicationPeers used by this ReplicationSourceManager 889 */ 890 public ReplicationPeers getReplicationPeers() { 891 return this.replicationPeers; 892 } 893 894 /** 895 * Get a string representation of all the sources' metrics 896 */ 897 public String getStats() { 898 StringBuilder stats = new StringBuilder(); 899 for (ReplicationSourceInterface source : this.sources.values()) { 900 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 901 stats.append(source.getStats() + "\n"); 902 } 903 for (ReplicationSourceInterface oldSource : oldsources) { 904 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 905 stats.append(oldSource.getStats() + "\n"); 906 } 907 return stats.toString(); 908 } 909 910 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 911 throws IOException { 912 for (ReplicationSourceInterface source : this.sources.values()) { 913 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 914 } 915 } 916 917 public void cleanUpHFileRefs(String peerId, List<String> files) { 918 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 919 } 920 921 int activeFailoverTaskCount() { 922 return executor.getActiveCount(); 923 } 924}