001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableSet; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.TreeSet; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.Future; 036import java.util.concurrent.LinkedBlockingQueue; 037import java.util.concurrent.RejectedExecutionException; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.stream.Collectors; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.Server; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationListener; 053import org.apache.hadoop.hbase.replication.ReplicationPeer; 054import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 055import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 056import org.apache.hadoop.hbase.replication.ReplicationPeers; 057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.ReplicationTracker; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.apache.zookeeper.KeeperException; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069 070/** 071 * This class is responsible to manage all the replication sources. There are two classes of 072 * sources: 073 * <ul> 074 * <li>Normal sources are persistent and one per peer cluster</li> 075 * <li>Old sources are recovered from a failed region server and our only goal is to finish 076 * replicating the WAL queue it had</li> 077 * </ul> 078 * <p> 079 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 080 * in order to transfer all the queues in a local old source. 081 * <p> 082 * Synchronization specification: 083 * <ul> 084 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 085 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 086 * operations.</li> 087 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 088 * {@link #addPeer(String)}, {@link #removePeer(String)}, 089 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. 090 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in 091 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 092 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is 093 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 094 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 095 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 096 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 097 * {@link #preLogRoll(Path)}.</li> 098 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 099 * modify it, {@link #removePeer(String)} , 100 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 101 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. 102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by 103 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 104 * {@link ReplicationSourceInterface} firstly, then remove the wals from 105 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} 106 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a 107 * {@link ReplicationSourceInterface}. So there is no race here. For 108 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there 109 * is already synchronized on {@link #oldsources}. So no need synchronized on 110 * {@link #walsByIdRecoveredQueues}.</li> 111 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 112 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 113 * to-be-removed peer.</li> 114 * </ul> 115 */ 116@InterfaceAudience.Private 117public class ReplicationSourceManager implements ReplicationListener { 118 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 119 // all the sources that read this RS's logs and every peer only has one replication source 120 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 121 // List of all the sources we got from died RSs 122 private final List<ReplicationSourceInterface> oldsources; 123 private final ReplicationQueueStorage queueStorage; 124 private final ReplicationTracker replicationTracker; 125 private final ReplicationPeers replicationPeers; 126 // UUID for this cluster 127 private final UUID clusterId; 128 // All about stopping 129 private final Server server; 130 131 // All logs we are currently tracking 132 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 133 // For normal replication source, the peer id is same with the queue id 134 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; 135 // Logs for recovered sources we are currently tracking 136 // the map is: queue_id->logPrefix/logGroup->logs 137 // For recovered source, the queue id's format is peer_id-servername-* 138 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; 139 140 private final Configuration conf; 141 private final FileSystem fs; 142 // The paths to the latest log of each wal group, for new coming peers 143 private final Set<Path> latestPaths; 144 // Path to the wals directories 145 private final Path logDir; 146 // Path to the wal archive 147 private final Path oldLogDir; 148 private final WALFileLengthProvider walFileLengthProvider; 149 // The number of ms that we wait before moving znodes, HBASE-3596 150 private final long sleepBeforeFailover; 151 // Homemade executer service for replication 152 private final ThreadPoolExecutor executor; 153 154 private final boolean replicationForBulkLoadDataEnabled; 155 156 157 private AtomicLong totalBufferUsed = new AtomicLong(); 158 159 /** 160 * Creates a replication manager and sets the watch on all the other registered region servers 161 * @param queueStorage the interface for manipulating replication queues 162 * @param replicationPeers 163 * @param replicationTracker 164 * @param conf the configuration to use 165 * @param server the server for this region server 166 * @param fs the file system to use 167 * @param logDir the directory that contains all wal directories of live RSs 168 * @param oldLogDir the directory where old logs are archived 169 * @param clusterId 170 */ 171 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 172 ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, 173 Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, 174 WALFileLengthProvider walFileLengthProvider) throws IOException { 175 // CopyOnWriteArrayList is thread-safe. 176 // Generally, reading is more than modifying. 177 this.sources = new ConcurrentHashMap<>(); 178 this.queueStorage = queueStorage; 179 this.replicationPeers = replicationPeers; 180 this.replicationTracker = replicationTracker; 181 this.server = server; 182 this.walsById = new ConcurrentHashMap<>(); 183 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 184 this.oldsources = new ArrayList<>(); 185 this.conf = conf; 186 this.fs = fs; 187 this.logDir = logDir; 188 this.oldLogDir = oldLogDir; 189 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 190 // seconds 191 this.clusterId = clusterId; 192 this.walFileLengthProvider = walFileLengthProvider; 193 this.replicationTracker.registerListener(this); 194 // It's preferable to failover 1 RS at a time, but with good zk servers 195 // more could be processed at the same time. 196 int nbWorkers = conf.getInt("replication.executor.workers", 1); 197 // use a short 100ms sleep since this could be done inline with a RS startup 198 // even if we fail, other region servers can take care of it 199 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, 200 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 201 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 202 tfb.setNameFormat("ReplicationExecutor-%d"); 203 tfb.setDaemon(true); 204 this.executor.setThreadFactory(tfb.build()); 205 this.latestPaths = new HashSet<Path>(); 206 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 207 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 208 } 209 210 /** 211 * Adds a normal source per registered peer cluster and tries to process all old region server wal 212 * queues 213 * <p> 214 * The returned future is for adoptAbandonedQueues task. 215 */ 216 Future<?> init() throws IOException { 217 for (String id : this.replicationPeers.getAllPeerIds()) { 218 addSource(id); 219 if (replicationForBulkLoadDataEnabled) { 220 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 221 // when a peer was added before replication for bulk loaded data was enabled. 222 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); 223 } 224 } 225 return this.executor.submit(this::adoptAbandonedQueues); 226 } 227 228 private void adoptAbandonedQueues() { 229 List<ServerName> currentReplicators = null; 230 try { 231 currentReplicators = queueStorage.getListOfReplicators(); 232 } catch (ReplicationException e) { 233 server.abort("Failed to get all replicators", e); 234 return; 235 } 236 if (currentReplicators == null || currentReplicators.isEmpty()) { 237 return; 238 } 239 List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() 240 .map(ServerName::valueOf).collect(Collectors.toList()); 241 LOG.info( 242 "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); 243 244 // Look if there's anything to process after a restart 245 for (ServerName rs : currentReplicators) { 246 if (!otherRegionServers.contains(rs)) { 247 transferQueues(rs); 248 } 249 } 250 } 251 252 /** 253 * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add 254 * HFile Refs 255 * @param peerId the id of replication peer 256 */ 257 public void addPeer(String peerId) throws IOException { 258 boolean added = false; 259 try { 260 added = this.replicationPeers.addPeer(peerId); 261 } catch (ReplicationException e) { 262 throw new IOException(e); 263 } 264 if (added) { 265 addSource(peerId); 266 if (replicationForBulkLoadDataEnabled) { 267 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); 268 } 269 } 270 } 271 272 /** 273 * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id 274 * and related replication queues 3. Remove the normal source and related replication queue 4. 275 * Remove HFile Refs 276 * @param peerId the id of the replication peer 277 */ 278 public void removePeer(String peerId) { 279 replicationPeers.removePeer(peerId); 280 String terminateMessage = "Replication stream was removed by a user"; 281 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 282 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 283 // see NodeFailoverWorker.run 284 synchronized (this.oldsources) { 285 // First close all the recovered sources for this peer 286 for (ReplicationSourceInterface src : oldsources) { 287 if (peerId.equals(src.getPeerId())) { 288 oldSourcesToDelete.add(src); 289 } 290 } 291 for (ReplicationSourceInterface src : oldSourcesToDelete) { 292 src.terminate(terminateMessage); 293 removeRecoveredSource(src); 294 } 295 } 296 LOG.info( 297 "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); 298 // Now close the normal source for this peer 299 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 300 if (srcToRemove != null) { 301 srcToRemove.terminate(terminateMessage); 302 removeSource(srcToRemove); 303 } else { 304 // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup 305 // Delete queue from storage and memory and queue id is same with peer id for normal 306 // source 307 deleteQueue(peerId); 308 this.walsById.remove(peerId); 309 } 310 311 // Remove HFile Refs 312 abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); 313 } 314 315 /** 316 * Factory method to create a replication source 317 * @param queueId the id of the replication queue 318 * @return the created source 319 */ 320 private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) 321 throws IOException { 322 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); 323 324 MetricsSource metrics = new MetricsSource(queueId); 325 // init replication source 326 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, 327 walFileLengthProvider, metrics); 328 return src; 329 } 330 331 /** 332 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 333 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 334 * group and do replication 335 * @param peerId the id of the replication peer 336 * @return the source that was created 337 */ 338 @VisibleForTesting 339 ReplicationSourceInterface addSource(String peerId) throws IOException { 340 ReplicationPeer peer = replicationPeers.getPeer(peerId); 341 ReplicationSourceInterface src = createSource(peerId, peer); 342 // synchronized on latestPaths to avoid missing the new log 343 synchronized (this.latestPaths) { 344 this.sources.put(peerId, src); 345 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 346 this.walsById.put(peerId, walsByGroup); 347 // Add the latest wal to that source's queue 348 if (this.latestPaths.size() > 0) { 349 for (Path logPath : latestPaths) { 350 String name = logPath.getName(); 351 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); 352 NavigableSet<String> logs = new TreeSet<>(); 353 logs.add(name); 354 walsByGroup.put(walPrefix, logs); 355 // Abort RS and throw exception to make add peer failed 356 abortAndThrowIOExceptionWhenFail( 357 () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); 358 src.enqueueLog(logPath); 359 } 360 } 361 } 362 src.startup(); 363 return src; 364 } 365 366 /** 367 * Close the previous replication sources of this peer id and open new sources to trigger the new 368 * replication state changes or new replication config changes. Here we don't need to change 369 * replication queue storage and only to enqueue all logs to the new replication source 370 * @param peerId the id of the replication peer 371 * @throws IOException 372 */ 373 public void refreshSources(String peerId) throws IOException { 374 String terminateMessage = "Peer " + peerId + 375 " state or config changed. Will close the previous replication source and open a new one"; 376 ReplicationPeer peer = replicationPeers.getPeer(peerId); 377 ReplicationSourceInterface src = createSource(peerId, peer); 378 // synchronized on latestPaths to avoid missing the new log 379 synchronized (this.latestPaths) { 380 ReplicationSourceInterface toRemove = this.sources.put(peerId, src); 381 if (toRemove != null) { 382 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 383 // Do not clear metrics 384 toRemove.terminate(terminateMessage, null, false); 385 } 386 for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { 387 walsByGroup.forEach(wal -> { 388 Path walPath = new Path(this.logDir, wal); 389 src.enqueueLog(walPath); 390 LOG.trace("Enqueued {} to source {} during source creation.", 391 walPath, src.getQueueId()); 392 }); 393 394 } 395 } 396 LOG.info("Startup replication source for " + src.getPeerId()); 397 src.startup(); 398 399 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 400 // synchronized on oldsources to avoid race with NodeFailoverWorker 401 synchronized (this.oldsources) { 402 List<String> previousQueueIds = new ArrayList<>(); 403 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 404 .hasNext();) { 405 ReplicationSourceInterface oldSource = iter.next(); 406 if (oldSource.getPeerId().equals(peerId)) { 407 previousQueueIds.add(oldSource.getQueueId()); 408 oldSource.terminate(terminateMessage); 409 iter.remove(); 410 } 411 } 412 for (String queueId : previousQueueIds) { 413 ReplicationSourceInterface replicationSource = createSource(queueId, peer); 414 this.oldsources.add(replicationSource); 415 LOG.trace("Added source for recovered queue: " + src.getQueueId()); 416 for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { 417 walsByGroup.forEach(wal -> { 418 LOG.trace("Enqueueing log from recovered queue for source: {}", 419 src.getQueueId()); 420 src.enqueueLog(new Path(wal)); 421 }); 422 } 423 toStartup.add(replicationSource); 424 } 425 } 426 for (ReplicationSourceInterface replicationSource : toStartup) { 427 replicationSource.startup(); 428 } 429 } 430 431 /** 432 * Clear the metrics and related replication queue of the specified old source 433 * @param src source to clear 434 */ 435 void removeRecoveredSource(ReplicationSourceInterface src) { 436 LOG.info("Done with the recovered queue " + src.getQueueId()); 437 this.oldsources.remove(src); 438 // Delete queue from storage and memory 439 deleteQueue(src.getQueueId()); 440 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 441 } 442 443 /** 444 * Clear the metrics and related replication queue of the specified old source 445 * @param src source to clear 446 */ 447 void removeSource(ReplicationSourceInterface src) { 448 LOG.info("Done with the queue " + src.getQueueId()); 449 this.sources.remove(src.getPeerId()); 450 // Delete queue from storage and memory 451 deleteQueue(src.getQueueId()); 452 this.walsById.remove(src.getQueueId()); 453 } 454 455 /** 456 * Delete a complete queue of wals associated with a replication source 457 * @param queueId the id of replication queue to delete 458 */ 459 private void deleteQueue(String queueId) { 460 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); 461 } 462 463 @FunctionalInterface 464 private interface ReplicationQueueOperation { 465 void exec() throws ReplicationException; 466 } 467 468 /** 469 * Refresh replication source will terminate the old source first, then the source thread will be 470 * interrupted. Need to handle it instead of abort the region server. 471 */ 472 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 473 try { 474 op.exec(); 475 } catch (ReplicationException e) { 476 if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 477 && e.getCause().getCause() != null && e.getCause() 478 .getCause() instanceof InterruptedException) { 479 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 480 // that thread is interrupted deep down in the stack, it should pass the following 481 // processing logic and propagate to the most top layer which can handle this exception 482 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 483 throw new ReplicationRuntimeException( 484 "Thread is interrupted, the replication source may be terminated", 485 e.getCause().getCause()); 486 } 487 server.abort("Failed to operate on replication queue", e); 488 } 489 } 490 491 private void abortWhenFail(ReplicationQueueOperation op) { 492 try { 493 op.exec(); 494 } catch (ReplicationException e) { 495 server.abort("Failed to operate on replication queue", e); 496 } 497 } 498 499 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 500 try { 501 op.exec(); 502 } catch (ReplicationException e) { 503 throw new IOException(e); 504 } 505 } 506 507 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 508 try { 509 op.exec(); 510 } catch (ReplicationException e) { 511 server.abort("Failed to operate on replication queue", e); 512 throw new IOException(e); 513 } 514 } 515 516 /** 517 * This method will log the current position to storage. And also clean old logs from the 518 * replication queue. 519 * @param queueId id of the replication queue 520 * @param queueRecovered indicates if this queue comes from another region server 521 * @param entryBatch the wal entry batch we just shipped 522 */ 523 public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, 524 WALEntryBatch entryBatch) { 525 String fileName = entryBatch.getLastWalPath().getName(); 526 interruptOrAbortWhenFail(() -> this.queueStorage 527 .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), 528 entryBatch.getLastSeqIds())); 529 cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); 530 } 531 532 /** 533 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 534 * file is closed and has no more entries. 535 * @param log Path to the log 536 * @param inclusive whether we should also remove the given log file 537 * @param queueId id of the replication queue 538 * @param queueRecovered Whether this is a recovered queue 539 */ 540 @VisibleForTesting 541 void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { 542 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 543 if (queueRecovered) { 544 NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); 545 if (wals != null) { 546 cleanOldLogs(wals, log, inclusive, queueId); 547 } 548 } else { 549 // synchronized on walsById to avoid race with preLogRoll 550 synchronized (this.walsById) { 551 NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); 552 if (wals != null) { 553 cleanOldLogs(wals, log, inclusive, queueId); 554 } 555 } 556 } 557 } 558 559 private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { 560 NavigableSet<String> walSet = wals.headSet(key, inclusive); 561 if (walSet.isEmpty()) { 562 return; 563 } 564 LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); 565 for (String wal : walSet) { 566 interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); 567 } 568 walSet.clear(); 569 } 570 571 // public because of we call it in TestReplicationEmptyWALRecovery 572 @VisibleForTesting 573 public void preLogRoll(Path newLog) throws IOException { 574 String logName = newLog.getName(); 575 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 576 // synchronized on latestPaths to avoid the new open source miss the new log 577 synchronized (this.latestPaths) { 578 // Add log to queue storage 579 for (ReplicationSourceInterface source : this.sources.values()) { 580 // If record log to queue storage failed, abort RS and throw exception to make log roll 581 // failed 582 abortAndThrowIOExceptionWhenFail( 583 () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); 584 } 585 586 // synchronized on walsById to avoid race with cleanOldLogs 587 synchronized (this.walsById) { 588 // Update walsById map 589 for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById 590 .entrySet()) { 591 String peerId = entry.getKey(); 592 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 593 boolean existingPrefix = false; 594 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 595 SortedSet<String> wals = walsEntry.getValue(); 596 if (this.sources.isEmpty()) { 597 // If there's no slaves, don't need to keep the old wals since 598 // we only consider the last one when a new slave comes in 599 wals.clear(); 600 } 601 if (logPrefix.equals(walsEntry.getKey())) { 602 wals.add(logName); 603 existingPrefix = true; 604 } 605 } 606 if (!existingPrefix) { 607 // The new log belongs to a new group, add it into this peer 608 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 609 NavigableSet<String> wals = new TreeSet<>(); 610 wals.add(logName); 611 walsByPrefix.put(logPrefix, wals); 612 } 613 } 614 } 615 616 // Add to latestPaths 617 Iterator<Path> iterator = latestPaths.iterator(); 618 while (iterator.hasNext()) { 619 Path path = iterator.next(); 620 if (path.getName().contains(logPrefix)) { 621 iterator.remove(); 622 break; 623 } 624 } 625 this.latestPaths.add(newLog); 626 } 627 } 628 629 // public because of we call it in TestReplicationEmptyWALRecovery 630 @VisibleForTesting 631 public void postLogRoll(Path newLog) throws IOException { 632 // This only updates the sources we own, not the recovered ones 633 for (ReplicationSourceInterface source : this.sources.values()) { 634 source.enqueueLog(newLog); 635 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", 636 newLog, source.getQueueId()); 637 } 638 } 639 640 @Override 641 public void regionServerRemoved(String regionserver) { 642 transferQueues(ServerName.valueOf(regionserver)); 643 } 644 645 /** 646 * Transfer all the queues of the specified to this region server. First it tries to grab a lock 647 * and if it works it will move the old queues and finally will delete the old queues. 648 * <p> 649 * It creates one old source for any type of source of the old rs. 650 */ 651 private void transferQueues(ServerName deadRS) { 652 if (server.getServerName().equals(deadRS)) { 653 // it's just us, give up 654 return; 655 } 656 NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); 657 try { 658 this.executor.execute(transfer); 659 } catch (RejectedExecutionException ex) { 660 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 661 .getGlobalSource().incrFailedRecoveryQueue(); 662 LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); 663 } 664 } 665 666 /** 667 * Class responsible to setup new ReplicationSources to take care of the queues from dead region 668 * servers. 669 */ 670 class NodeFailoverWorker extends Thread { 671 672 private final ServerName deadRS; 673 // After claim the queues from dead region server, the NodeFailoverWorker will skip to start 674 // the RecoveredReplicationSource if the peer has been removed. but there's possible that 675 // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the 676 // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we 677 // should start the RecoveredReplicationSource. If the latest peer is not the old peer when 678 // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise 679 // the rs will abort (See HBASE-20475). 680 private final Map<String, ReplicationPeerImpl> peersSnapshot; 681 682 @VisibleForTesting 683 public NodeFailoverWorker(ServerName deadRS) { 684 super("Failover-for-" + deadRS); 685 this.deadRS = deadRS; 686 peersSnapshot = new HashMap<>(replicationPeers.getPeerCache()); 687 } 688 689 private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) { 690 ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId); 691 return oldPeerRef != null && oldPeerRef == newPeerRef; 692 } 693 694 @Override 695 public void run() { 696 // Wait a bit before transferring the queues, we may be shutting down. 697 // This sleep may not be enough in some cases. 698 try { 699 Thread.sleep(sleepBeforeFailover + 700 (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 701 } catch (InterruptedException e) { 702 LOG.warn("Interrupted while waiting before transferring a queue."); 703 Thread.currentThread().interrupt(); 704 } 705 // We try to lock that rs' queue directory 706 if (server.isStopped()) { 707 LOG.info("Not transferring queue since we are shutting down"); 708 return; 709 } 710 Map<String, Set<String>> newQueues = new HashMap<>(); 711 try { 712 List<String> queues = queueStorage.getAllQueues(deadRS); 713 while (!queues.isEmpty()) { 714 Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS, 715 queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName()); 716 long sleep = sleepBeforeFailover / 2; 717 if (!peer.getSecond().isEmpty()) { 718 newQueues.put(peer.getFirst(), peer.getSecond()); 719 sleep = sleepBeforeFailover; 720 } 721 try { 722 Thread.sleep(sleep); 723 } catch (InterruptedException e) { 724 LOG.warn("Interrupted while waiting before transferring a queue."); 725 Thread.currentThread().interrupt(); 726 } 727 queues = queueStorage.getAllQueues(deadRS); 728 } 729 if (queues.isEmpty()) { 730 queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); 731 } 732 } catch (ReplicationException e) { 733 LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " + 734 "replication queue. Znode : (%s)" + 735 " Possible solution: check if znode size exceeds jute.maxBuffer value. " + 736 " If so, increase it for both client and server side." + e), deadRS, 737 queueStorage.getRsNode(deadRS)); 738 server.abort("Failed to claim queue from dead regionserver.", e); 739 return; 740 } 741 // Copying over the failed queue is completed. 742 if (newQueues.isEmpty()) { 743 // We either didn't get the lock or the failed region server didn't have any outstanding 744 // WALs to replicate, so we are done. 745 return; 746 } 747 748 for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) { 749 String queueId = entry.getKey(); 750 Set<String> walsSet = entry.getValue(); 751 try { 752 // there is not an actual peer defined corresponding to peerId for the failover. 753 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); 754 String actualPeerId = replicationQueueInfo.getPeerId(); 755 756 ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); 757 if (peer == null || !isOldPeer(actualPeerId, peer)) { 758 LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId, 759 deadRS); 760 abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); 761 continue; 762 } 763 if (server instanceof ReplicationSyncUp.DummyServer 764 && peer.getPeerState().equals(PeerState.DISABLED)) { 765 LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip " 766 + "replicating data to this peer.", 767 actualPeerId); 768 continue; 769 } 770 // track sources in walsByIdRecoveredQueues 771 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 772 walsByIdRecoveredQueues.put(queueId, walsByGroup); 773 for (String wal : walsSet) { 774 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 775 NavigableSet<String> wals = walsByGroup.get(walPrefix); 776 if (wals == null) { 777 wals = new TreeSet<>(); 778 walsByGroup.put(walPrefix, wals); 779 } 780 wals.add(wal); 781 } 782 783 ReplicationSourceInterface src = createSource(queueId, peer); 784 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 785 synchronized (oldsources) { 786 peer = replicationPeers.getPeer(src.getPeerId()); 787 if (peer == null || !isOldPeer(src.getPeerId(), peer)) { 788 src.terminate("Recovered queue doesn't belong to any current peer"); 789 removeRecoveredSource(src); 790 continue; 791 } 792 oldsources.add(src); 793 for (String wal : walsSet) { 794 src.enqueueLog(new Path(oldLogDir, wal)); 795 } 796 src.startup(); 797 } 798 } catch (IOException e) { 799 // TODO manage it 800 LOG.error("Failed creating a source", e); 801 } 802 } 803 } 804 } 805 806 /** 807 * Terminate the replication on this region server 808 */ 809 public void join() { 810 this.executor.shutdown(); 811 for (ReplicationSourceInterface source : this.sources.values()) { 812 source.terminate("Region server is closing"); 813 } 814 } 815 816 /** 817 * Get a copy of the wals of the normal sources on this rs 818 * @return a sorted set of wal names 819 */ 820 @VisibleForTesting 821 public Map<String, Map<String, NavigableSet<String>>> getWALs() { 822 return Collections.unmodifiableMap(walsById); 823 } 824 825 /** 826 * Get a copy of the wals of the recovered sources on this rs 827 * @return a sorted set of wal names 828 */ 829 @VisibleForTesting 830 Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { 831 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 832 } 833 834 /** 835 * Get a list of all the normal sources of this rs 836 * @return list of all normal sources 837 */ 838 public List<ReplicationSourceInterface> getSources() { 839 return new ArrayList<>(this.sources.values()); 840 } 841 842 /** 843 * Get a list of all the recovered sources of this rs 844 * @return list of all recovered sources 845 */ 846 public List<ReplicationSourceInterface> getOldSources() { 847 return this.oldsources; 848 } 849 850 /** 851 * Get the normal source for a given peer 852 * @return the normal source for the give peer if it exists, otherwise null. 853 */ 854 @VisibleForTesting 855 public ReplicationSourceInterface getSource(String peerId) { 856 return this.sources.get(peerId); 857 } 858 859 @VisibleForTesting 860 List<String> getAllQueues() throws IOException { 861 List<String> allQueues = Collections.emptyList(); 862 try { 863 allQueues = queueStorage.getAllQueues(server.getServerName()); 864 } catch (ReplicationException e) { 865 throw new IOException(e); 866 } 867 return allQueues; 868 } 869 870 @VisibleForTesting 871 int getSizeOfLatestPath() { 872 synchronized (latestPaths) { 873 return latestPaths.size(); 874 } 875 } 876 877 @VisibleForTesting 878 public AtomicLong getTotalBufferUsed() { 879 return totalBufferUsed; 880 } 881 882 /** 883 * Get the directory where wals are archived 884 * @return the directory where wals are archived 885 */ 886 public Path getOldLogDir() { 887 return this.oldLogDir; 888 } 889 890 /** 891 * Get the directory where wals are stored by their RSs 892 * @return the directory where wals are stored by their RSs 893 */ 894 public Path getLogDir() { 895 return this.logDir; 896 } 897 898 /** 899 * Get the handle on the local file system 900 * @return Handle on the local file system 901 */ 902 public FileSystem getFs() { 903 return this.fs; 904 } 905 906 /** 907 * Get the ReplicationPeers used by this ReplicationSourceManager 908 * @return the ReplicationPeers used by this ReplicationSourceManager 909 */ 910 public ReplicationPeers getReplicationPeers() { 911 return this.replicationPeers; 912 } 913 914 /** 915 * Get a string representation of all the sources' metrics 916 */ 917 public String getStats() { 918 StringBuilder stats = new StringBuilder(); 919 for (ReplicationSourceInterface source : this.sources.values()) { 920 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 921 stats.append(source.getStats() + "\n"); 922 } 923 for (ReplicationSourceInterface oldSource : oldsources) { 924 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 925 stats.append(oldSource.getStats() + "\n"); 926 } 927 return stats.toString(); 928 } 929 930 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 931 throws IOException { 932 for (ReplicationSourceInterface source : this.sources.values()) { 933 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 934 } 935 } 936 937 public void cleanUpHFileRefs(String peerId, List<String> files) { 938 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 939 } 940 941 int activeFailoverTaskCount() { 942 return executor.getActiveCount(); 943 } 944}