001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.NavigableSet; 029import java.util.OptionalLong; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.TreeSet; 033import java.util.UUID; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.Future; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.RejectedExecutionException; 039import java.util.concurrent.ThreadLocalRandom; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.concurrent.atomic.AtomicReference; 044import java.util.stream.Collectors; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.Server; 051import org.apache.hadoop.hbase.ServerName; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 055import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 056import org.apache.hadoop.hbase.replication.ReplicationException; 057import org.apache.hadoop.hbase.replication.ReplicationListener; 058import org.apache.hadoop.hbase.replication.ReplicationPeer; 059import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 060import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 061import org.apache.hadoop.hbase.replication.ReplicationPeers; 062import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 063import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 064import org.apache.hadoop.hbase.replication.ReplicationTracker; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 067import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.apache.hadoop.hbase.wal.WALProvider; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.apache.zookeeper.KeeperException; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 077 078/** 079 * This class is responsible to manage all the replication sources. There are two classes of 080 * sources: 081 * <ul> 082 * <li>Normal sources are persistent and one per peer cluster</li> 083 * <li>Old sources are recovered from a failed region server and our only goal is to finish 084 * replicating the WAL queue it had</li> 085 * </ul> 086 * <p> 087 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 088 * in order to transfer all the queues in a local old source. 089 * <p> 090 * Synchronization specification: 091 * <ul> 092 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 093 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 094 * operations.</li> 095 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 096 * {@link #addPeer(String)}, {@link #removePeer(String)}, 097 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. 098 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in 099 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 100 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is 101 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 102 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 103 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 104 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 105 * {@link #preLogRoll(Path)}.</li> 106 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 107 * modify it, {@link #removePeer(String)} , 108 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 109 * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. 110 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by 111 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 112 * {@link ReplicationSourceInterface} firstly, then remove the wals from 113 * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} 114 * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a 115 * {@link ReplicationSourceInterface}. So there is no race here. For 116 * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there 117 * is already synchronized on {@link #oldsources}. So no need synchronized on 118 * {@link #walsByIdRecoveredQueues}.</li> 119 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 120 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 121 * to-be-removed peer.</li> 122 * </ul> 123 */ 124@InterfaceAudience.Private 125public class ReplicationSourceManager implements ReplicationListener { 126 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 127 // all the sources that read this RS's logs and every peer only has one replication source 128 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 129 // List of all the sources we got from died RSs 130 private final List<ReplicationSourceInterface> oldsources; 131 132 /** 133 * Storage for queues that need persistance; e.g. Replication state so can be recovered 134 * after a crash. queueStorage upkeep is spread about this class and passed 135 * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource 136 * instances keep state. 137 */ 138 private final ReplicationQueueStorage queueStorage; 139 140 private final ReplicationTracker replicationTracker; 141 private final ReplicationPeers replicationPeers; 142 // UUID for this cluster 143 private final UUID clusterId; 144 // All about stopping 145 private final Server server; 146 147 // All logs we are currently tracking 148 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 149 // For normal replication source, the peer id is same with the queue id 150 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; 151 // Logs for recovered sources we are currently tracking 152 // the map is: queue_id->logPrefix/logGroup->logs 153 // For recovered source, the queue id's format is peer_id-servername-* 154 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; 155 156 private final Configuration conf; 157 private final FileSystem fs; 158 // The paths to the latest log of each wal group, for new coming peers 159 private final Set<Path> latestPaths; 160 // Path to the wals directories 161 private final Path logDir; 162 // Path to the wal archive 163 private final Path oldLogDir; 164 private final WALFactory walFactory; 165 // The number of ms that we wait before moving znodes, HBASE-3596 166 private final long sleepBeforeFailover; 167 // Homemade executer service for replication 168 private final ThreadPoolExecutor executor; 169 170 private final boolean replicationForBulkLoadDataEnabled; 171 172 173 private AtomicLong totalBufferUsed = new AtomicLong(); 174 // Total buffer size on this RegionServer for holding batched edits to be shipped. 175 private final long totalBufferLimit; 176 private final MetricsReplicationGlobalSourceSource globalMetrics; 177 178 /** 179 * A special ReplicationSource for hbase:meta Region Read Replicas. 180 * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we 181 * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of 182 * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from 183 * this server (in case it later gets moved back). We synchronize on this instance testing for 184 * presence and if absent, while creating so only created and started once. 185 */ 186 AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>(); 187 188 /** 189 * Creates a replication manager and sets the watch on all the other registered region servers 190 * @param queueStorage the interface for manipulating replication queues 191 * @param conf the configuration to use 192 * @param server the server for this region server 193 * @param fs the file system to use 194 * @param logDir the directory that contains all wal directories of live RSs 195 * @param oldLogDir the directory where old logs are archived 196 */ 197 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 198 ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, 199 Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, 200 WALFactory walFactory, 201 MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { 202 // CopyOnWriteArrayList is thread-safe. 203 // Generally, reading is more than modifying. 204 this.sources = new ConcurrentHashMap<>(); 205 this.queueStorage = queueStorage; 206 this.replicationPeers = replicationPeers; 207 this.replicationTracker = replicationTracker; 208 this.server = server; 209 this.walsById = new ConcurrentHashMap<>(); 210 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 211 this.oldsources = new ArrayList<>(); 212 this.conf = conf; 213 this.fs = fs; 214 this.logDir = logDir; 215 this.oldLogDir = oldLogDir; 216 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 217 // seconds 218 this.clusterId = clusterId; 219 this.walFactory = walFactory; 220 this.replicationTracker.registerListener(this); 221 // It's preferable to failover 1 RS at a time, but with good zk servers 222 // more could be processed at the same time. 223 int nbWorkers = conf.getInt("replication.executor.workers", 1); 224 // use a short 100ms sleep since this could be done inline with a RS startup 225 // even if we fail, other region servers can take care of it 226 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, 227 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 228 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 229 tfb.setNameFormat("ReplicationExecutor-%d"); 230 tfb.setDaemon(true); 231 this.executor.setThreadFactory(tfb.build()); 232 this.latestPaths = new HashSet<Path>(); 233 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 234 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 235 this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 236 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 237 this.globalMetrics = globalMetrics; 238 } 239 240 /** 241 * Adds a normal source per registered peer cluster and tries to process all old region server wal 242 * queues 243 * <p> 244 * The returned future is for adoptAbandonedQueues task. 245 */ 246 Future<?> init() throws IOException { 247 for (String id : this.replicationPeers.getAllPeerIds()) { 248 addSource(id); 249 if (replicationForBulkLoadDataEnabled) { 250 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 251 // when a peer was added before replication for bulk loaded data was enabled. 252 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); 253 } 254 } 255 return this.executor.submit(this::adoptAbandonedQueues); 256 } 257 258 private void adoptAbandonedQueues() { 259 List<ServerName> currentReplicators = null; 260 try { 261 currentReplicators = queueStorage.getListOfReplicators(); 262 } catch (ReplicationException e) { 263 server.abort("Failed to get all replicators", e); 264 return; 265 } 266 if (currentReplicators == null || currentReplicators.isEmpty()) { 267 return; 268 } 269 List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream() 270 .map(ServerName::valueOf).collect(Collectors.toList()); 271 LOG.info( 272 "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); 273 274 // Look if there's anything to process after a restart 275 for (ServerName rs : currentReplicators) { 276 if (!otherRegionServers.contains(rs)) { 277 transferQueues(rs); 278 } 279 } 280 } 281 282 /** 283 * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add 284 * HFile Refs 285 * @param peerId the id of replication peer 286 */ 287 public void addPeer(String peerId) throws IOException { 288 boolean added = false; 289 try { 290 added = this.replicationPeers.addPeer(peerId); 291 } catch (ReplicationException e) { 292 throw new IOException(e); 293 } 294 if (added) { 295 addSource(peerId); 296 if (replicationForBulkLoadDataEnabled) { 297 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); 298 } 299 } 300 } 301 302 /** 303 * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id 304 * and related replication queues 3. Remove the normal source and related replication queue 4. 305 * Remove HFile Refs 306 * @param peerId the id of the replication peer 307 */ 308 public void removePeer(String peerId) { 309 replicationPeers.removePeer(peerId); 310 String terminateMessage = "Replication stream was removed by a user"; 311 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 312 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 313 // see NodeFailoverWorker.run 314 synchronized (this.oldsources) { 315 // First close all the recovered sources for this peer 316 for (ReplicationSourceInterface src : oldsources) { 317 if (peerId.equals(src.getPeerId())) { 318 oldSourcesToDelete.add(src); 319 } 320 } 321 for (ReplicationSourceInterface src : oldSourcesToDelete) { 322 src.terminate(terminateMessage); 323 removeRecoveredSource(src); 324 } 325 } 326 LOG.info( 327 "Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); 328 // Now close the normal source for this peer 329 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 330 if (srcToRemove != null) { 331 srcToRemove.terminate(terminateMessage); 332 removeSource(srcToRemove); 333 } else { 334 // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup 335 // Delete queue from storage and memory and queue id is same with peer id for normal 336 // source 337 deleteQueue(peerId); 338 this.walsById.remove(peerId); 339 } 340 341 // Remove HFile Refs 342 abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); 343 } 344 345 /** 346 * @return a new 'classic' user-space replication source. 347 * @param queueId the id of the replication queue to associate the ReplicationSource with. 348 * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta. 349 */ 350 private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) 351 throws IOException { 352 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); 353 // Init the just created replication source. Pass the default walProvider's wal file length 354 // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica 355 // replication, see #createCatalogReplicationSource(). 356 WALFileLengthProvider walFileLengthProvider = 357 this.walFactory.getWALProvider() != null? 358 this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty(); 359 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, 360 walFileLengthProvider, new MetricsSource(queueId)); 361 return src; 362 } 363 364 /** 365 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 366 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 367 * group and do replication 368 * @param peerId the id of the replication peer 369 * @return the source that was created 370 */ 371 ReplicationSourceInterface addSource(String peerId) throws IOException { 372 ReplicationPeer peer = replicationPeers.getPeer(peerId); 373 ReplicationSourceInterface src = createSource(peerId, peer); 374 // synchronized on latestPaths to avoid missing the new log 375 synchronized (this.latestPaths) { 376 this.sources.put(peerId, src); 377 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 378 this.walsById.put(peerId, walsByGroup); 379 // Add the latest wal to that source's queue 380 if (this.latestPaths.size() > 0) { 381 for (Path logPath : latestPaths) { 382 String name = logPath.getName(); 383 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); 384 NavigableSet<String> logs = new TreeSet<>(); 385 logs.add(name); 386 walsByGroup.put(walPrefix, logs); 387 // Abort RS and throw exception to make add peer failed 388 abortAndThrowIOExceptionWhenFail( 389 () -> this.queueStorage.addWAL(server.getServerName(), peerId, name)); 390 src.enqueueLog(logPath); 391 } 392 } 393 } 394 src.startup(); 395 return src; 396 } 397 398 /** 399 * Close the previous replication sources of this peer id and open new sources to trigger the new 400 * replication state changes or new replication config changes. Here we don't need to change 401 * replication queue storage and only to enqueue all logs to the new replication source 402 * @param peerId the id of the replication peer 403 * @throws IOException 404 */ 405 public void refreshSources(String peerId) throws IOException { 406 String terminateMessage = "Peer " + peerId + 407 " state or config changed. Will close the previous replication source and open a new one"; 408 ReplicationPeer peer = replicationPeers.getPeer(peerId); 409 ReplicationSourceInterface src = createSource(peerId, peer); 410 // synchronized on latestPaths to avoid missing the new log 411 synchronized (this.latestPaths) { 412 ReplicationSourceInterface toRemove = this.sources.put(peerId, src); 413 if (toRemove != null) { 414 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 415 // Do not clear metrics 416 toRemove.terminate(terminateMessage, null, false); 417 } 418 for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) { 419 walsByGroup.forEach(wal -> { 420 Path walPath = new Path(this.logDir, wal); 421 src.enqueueLog(walPath); 422 LOG.trace("Enqueued {} to source {} during source creation.", 423 walPath, src.getQueueId()); 424 }); 425 426 } 427 } 428 LOG.info("Startup replication source for " + src.getPeerId()); 429 src.startup(); 430 431 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 432 // synchronized on oldsources to avoid race with NodeFailoverWorker 433 synchronized (this.oldsources) { 434 List<String> previousQueueIds = new ArrayList<>(); 435 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 436 .hasNext();) { 437 ReplicationSourceInterface oldSource = iter.next(); 438 if (oldSource.getPeerId().equals(peerId)) { 439 previousQueueIds.add(oldSource.getQueueId()); 440 oldSource.terminate(terminateMessage); 441 iter.remove(); 442 } 443 } 444 for (String queueId : previousQueueIds) { 445 ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer); 446 this.oldsources.add(recoveredReplicationSource); 447 for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { 448 walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); 449 } 450 toStartup.add(recoveredReplicationSource); 451 } 452 } 453 for (ReplicationSourceInterface replicationSource : toStartup) { 454 replicationSource.startup(); 455 } 456 } 457 458 /** 459 * Clear the metrics and related replication queue of the specified old source 460 * @param src source to clear 461 */ 462 void removeRecoveredSource(ReplicationSourceInterface src) { 463 LOG.info("Done with the recovered queue " + src.getQueueId()); 464 this.oldsources.remove(src); 465 // Delete queue from storage and memory 466 deleteQueue(src.getQueueId()); 467 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 468 } 469 470 /** 471 * Clear the metrics and related replication queue of the specified old source 472 * @param src source to clear 473 */ 474 void removeSource(ReplicationSourceInterface src) { 475 LOG.info("Done with the queue " + src.getQueueId()); 476 this.sources.remove(src.getPeerId()); 477 // Delete queue from storage and memory 478 deleteQueue(src.getQueueId()); 479 this.walsById.remove(src.getQueueId()); 480 } 481 482 /** 483 * Delete a complete queue of wals associated with a replication source 484 * @param queueId the id of replication queue to delete 485 */ 486 private void deleteQueue(String queueId) { 487 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); 488 } 489 490 @FunctionalInterface 491 private interface ReplicationQueueOperation { 492 void exec() throws ReplicationException; 493 } 494 495 /** 496 * Refresh replication source will terminate the old source first, then the source thread will be 497 * interrupted. Need to handle it instead of abort the region server. 498 */ 499 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 500 try { 501 op.exec(); 502 } catch (ReplicationException e) { 503 if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 504 && e.getCause().getCause() != null && e.getCause() 505 .getCause() instanceof InterruptedException) { 506 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 507 // that thread is interrupted deep down in the stack, it should pass the following 508 // processing logic and propagate to the most top layer which can handle this exception 509 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 510 throw new ReplicationRuntimeException( 511 "Thread is interrupted, the replication source may be terminated", 512 e.getCause().getCause()); 513 } 514 server.abort("Failed to operate on replication queue", e); 515 } 516 } 517 518 private void abortWhenFail(ReplicationQueueOperation op) { 519 try { 520 op.exec(); 521 } catch (ReplicationException e) { 522 server.abort("Failed to operate on replication queue", e); 523 } 524 } 525 526 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 527 try { 528 op.exec(); 529 } catch (ReplicationException e) { 530 throw new IOException(e); 531 } 532 } 533 534 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 535 try { 536 op.exec(); 537 } catch (ReplicationException e) { 538 server.abort("Failed to operate on replication queue", e); 539 throw new IOException(e); 540 } 541 } 542 543 /** 544 * This method will log the current position to storage. And also clean old logs from the 545 * replication queue. 546 * @param entryBatch the wal entry batch we just shipped 547 */ 548 public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, 549 WALEntryBatch entryBatch) { 550 String fileName = entryBatch.getLastWalPath().getName(); 551 String queueId = source.getQueueId(); 552 interruptOrAbortWhenFail(() -> this.queueStorage 553 .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(), 554 entryBatch.getLastSeqIds())); 555 cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered()); 556 } 557 558 /** 559 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 560 * file is closed and has no more entries. 561 * @param log Path to the log 562 * @param inclusive whether we should also remove the given log file 563 * @param queueId id of the replication queue 564 * @param queueRecovered Whether this is a recovered queue 565 */ 566 void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { 567 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 568 if (queueRecovered) { 569 NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); 570 if (wals != null) { 571 cleanOldLogs(wals, log, inclusive, queueId); 572 } 573 } else { 574 // synchronized on walsById to avoid race with preLogRoll 575 synchronized (this.walsById) { 576 NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); 577 if (wals != null) { 578 cleanOldLogs(wals, log, inclusive, queueId); 579 } 580 } 581 } 582 } 583 584 private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { 585 NavigableSet<String> walSet = wals.headSet(key, inclusive); 586 if (walSet.isEmpty()) { 587 return; 588 } 589 LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); 590 for (String wal : walSet) { 591 interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); 592 } 593 walSet.clear(); 594 } 595 596 // public because of we call it in TestReplicationEmptyWALRecovery 597 public void preLogRoll(Path newLog) throws IOException { 598 String logName = newLog.getName(); 599 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 600 // synchronized on latestPaths to avoid the new open source miss the new log 601 synchronized (this.latestPaths) { 602 // Add log to queue storage 603 for (ReplicationSourceInterface source : this.sources.values()) { 604 // If record log to queue storage failed, abort RS and throw exception to make log roll 605 // failed 606 abortAndThrowIOExceptionWhenFail( 607 () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); 608 } 609 610 // synchronized on walsById to avoid race with cleanOldLogs 611 synchronized (this.walsById) { 612 // Update walsById map 613 for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById 614 .entrySet()) { 615 String peerId = entry.getKey(); 616 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 617 boolean existingPrefix = false; 618 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 619 SortedSet<String> wals = walsEntry.getValue(); 620 if (this.sources.isEmpty()) { 621 // If there's no slaves, don't need to keep the old wals since 622 // we only consider the last one when a new slave comes in 623 wals.clear(); 624 } 625 if (logPrefix.equals(walsEntry.getKey())) { 626 wals.add(logName); 627 existingPrefix = true; 628 } 629 } 630 if (!existingPrefix) { 631 // The new log belongs to a new group, add it into this peer 632 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 633 NavigableSet<String> wals = new TreeSet<>(); 634 wals.add(logName); 635 walsByPrefix.put(logPrefix, wals); 636 } 637 } 638 } 639 640 // Add to latestPaths 641 Iterator<Path> iterator = latestPaths.iterator(); 642 while (iterator.hasNext()) { 643 Path path = iterator.next(); 644 if (path.getName().contains(logPrefix)) { 645 iterator.remove(); 646 break; 647 } 648 } 649 this.latestPaths.add(newLog); 650 } 651 } 652 653 // public because of we call it in TestReplicationEmptyWALRecovery 654 public void postLogRoll(Path newLog) throws IOException { 655 // This only updates the sources we own, not the recovered ones 656 for (ReplicationSourceInterface source : this.sources.values()) { 657 source.enqueueLog(newLog); 658 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", 659 newLog, source.getQueueId()); 660 } 661 } 662 663 @Override 664 public void regionServerRemoved(String regionserver) { 665 transferQueues(ServerName.valueOf(regionserver)); 666 } 667 668 /** 669 * Transfer all the queues of the specified to this region server. First it tries to grab a lock 670 * and if it works it will move the old queues and finally will delete the old queues. 671 * <p> 672 * It creates one old source for any type of source of the old rs. 673 */ 674 private void transferQueues(ServerName deadRS) { 675 if (server.getServerName().equals(deadRS)) { 676 // it's just us, give up 677 return; 678 } 679 NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS); 680 try { 681 this.executor.execute(transfer); 682 } catch (RejectedExecutionException ex) { 683 CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) 684 .getGlobalSource().incrFailedRecoveryQueue(); 685 LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage()); 686 } 687 } 688 689 /** 690 * Class responsible to setup new ReplicationSources to take care of the queues from dead region 691 * servers. 692 */ 693 class NodeFailoverWorker extends Thread { 694 695 private final ServerName deadRS; 696 // After claim the queues from dead region server, the NodeFailoverWorker will skip to start 697 // the RecoveredReplicationSource if the peer has been removed. but there's possible that 698 // remove a peer with peerId = 2 and add a peer with peerId = 2 again during the 699 // NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we 700 // should start the RecoveredReplicationSource. If the latest peer is not the old peer when 701 // NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise 702 // the rs will abort (See HBASE-20475). 703 private final Map<String, ReplicationPeerImpl> peersSnapshot; 704 705 public NodeFailoverWorker(ServerName deadRS) { 706 super("Failover-for-" + deadRS); 707 this.deadRS = deadRS; 708 peersSnapshot = new HashMap<>(replicationPeers.getPeerCache()); 709 } 710 711 private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) { 712 ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId); 713 return oldPeerRef != null && oldPeerRef == newPeerRef; 714 } 715 716 @Override 717 public void run() { 718 // Wait a bit before transferring the queues, we may be shutting down. 719 // This sleep may not be enough in some cases. 720 try { 721 Thread.sleep(sleepBeforeFailover + 722 (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 723 } catch (InterruptedException e) { 724 LOG.warn("Interrupted while waiting before transferring a queue."); 725 Thread.currentThread().interrupt(); 726 } 727 // We try to lock that rs' queue directory 728 if (server.isStopped()) { 729 LOG.info("Not transferring queue since we are shutting down"); 730 return; 731 } 732 Map<String, Set<String>> newQueues = new HashMap<>(); 733 try { 734 List<String> queues = queueStorage.getAllQueues(deadRS); 735 while (!queues.isEmpty()) { 736 Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS, 737 queues.get(ThreadLocalRandom.current().nextInt(queues.size())), server.getServerName()); 738 long sleep = sleepBeforeFailover / 2; 739 if (!peer.getSecond().isEmpty()) { 740 newQueues.put(peer.getFirst(), peer.getSecond()); 741 sleep = sleepBeforeFailover; 742 } 743 try { 744 Thread.sleep(sleep); 745 } catch (InterruptedException e) { 746 LOG.warn("Interrupted while waiting before transferring a queue."); 747 Thread.currentThread().interrupt(); 748 } 749 queues = queueStorage.getAllQueues(deadRS); 750 } 751 if (queues.isEmpty()) { 752 queueStorage.removeReplicatorIfQueueIsEmpty(deadRS); 753 } 754 } catch (ReplicationException e) { 755 LOG.error(String.format("ReplicationException: cannot claim dead region (%s)'s " + 756 "replication queue. Znode : (%s)" + 757 " Possible solution: check if znode size exceeds jute.maxBuffer value. " + 758 " If so, increase it for both client and server side." + e), deadRS, 759 queueStorage.getRsNode(deadRS)); 760 server.abort("Failed to claim queue from dead regionserver.", e); 761 return; 762 } 763 // Copying over the failed queue is completed. 764 if (newQueues.isEmpty()) { 765 // We either didn't get the lock or the failed region server didn't have any outstanding 766 // WALs to replicate, so we are done. 767 return; 768 } 769 770 for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) { 771 String queueId = entry.getKey(); 772 Set<String> walsSet = entry.getValue(); 773 try { 774 // there is not an actual peer defined corresponding to peerId for the failover. 775 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); 776 String actualPeerId = replicationQueueInfo.getPeerId(); 777 778 ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId); 779 if (peer == null || !isOldPeer(actualPeerId, peer)) { 780 LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId, 781 deadRS); 782 abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); 783 continue; 784 } 785 if (server instanceof ReplicationSyncUp.DummyServer 786 && peer.getPeerState().equals(PeerState.DISABLED)) { 787 LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip " 788 + "replicating data to this peer.", 789 actualPeerId); 790 continue; 791 } 792 // track sources in walsByIdRecoveredQueues 793 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 794 walsByIdRecoveredQueues.put(queueId, walsByGroup); 795 for (String wal : walsSet) { 796 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 797 NavigableSet<String> wals = walsByGroup.get(walPrefix); 798 if (wals == null) { 799 wals = new TreeSet<>(); 800 walsByGroup.put(walPrefix, wals); 801 } 802 wals.add(wal); 803 } 804 805 ReplicationSourceInterface src = createSource(queueId, peer); 806 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 807 synchronized (oldsources) { 808 peer = replicationPeers.getPeer(src.getPeerId()); 809 if (peer == null || !isOldPeer(src.getPeerId(), peer)) { 810 src.terminate("Recovered queue doesn't belong to any current peer"); 811 removeRecoveredSource(src); 812 continue; 813 } 814 oldsources.add(src); 815 LOG.info("Added recovered source {}", src.getQueueId()); 816 for (String wal : walsSet) { 817 src.enqueueLog(new Path(oldLogDir, wal)); 818 } 819 src.startup(); 820 } 821 } catch (IOException e) { 822 // TODO manage it 823 LOG.error("Failed creating a source", e); 824 } 825 } 826 } 827 } 828 829 /** 830 * Terminate the replication on this region server 831 */ 832 public void join() { 833 this.executor.shutdown(); 834 for (ReplicationSourceInterface source : this.sources.values()) { 835 source.terminate("Region server is closing"); 836 } 837 } 838 839 /** 840 * Get a copy of the wals of the normal sources on this rs 841 * @return a sorted set of wal names 842 */ 843 public Map<String, Map<String, NavigableSet<String>>> getWALs() { 844 return Collections.unmodifiableMap(walsById); 845 } 846 847 /** 848 * Get a copy of the wals of the recovered sources on this rs 849 * @return a sorted set of wal names 850 */ 851 Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { 852 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 853 } 854 855 /** 856 * Get a list of all the normal sources of this rs 857 * @return list of all normal sources 858 */ 859 public List<ReplicationSourceInterface> getSources() { 860 return new ArrayList<>(this.sources.values()); 861 } 862 863 /** 864 * Get a list of all the recovered sources of this rs 865 * @return list of all recovered sources 866 */ 867 public List<ReplicationSourceInterface> getOldSources() { 868 return this.oldsources; 869 } 870 871 /** 872 * Get the normal source for a given peer 873 * @return the normal source for the give peer if it exists, otherwise null. 874 */ 875 public ReplicationSourceInterface getSource(String peerId) { 876 return this.sources.get(peerId); 877 } 878 879 List<String> getAllQueues() throws IOException { 880 List<String> allQueues = Collections.emptyList(); 881 try { 882 allQueues = queueStorage.getAllQueues(server.getServerName()); 883 } catch (ReplicationException e) { 884 throw new IOException(e); 885 } 886 return allQueues; 887 } 888 889 int getSizeOfLatestPath() { 890 synchronized (latestPaths) { 891 return latestPaths.size(); 892 } 893 } 894 895 public AtomicLong getTotalBufferUsed() { 896 return totalBufferUsed; 897 } 898 899 /** 900 * Returns the maximum size in bytes of edits held in memory which are pending replication 901 * across all sources inside this RegionServer. 902 */ 903 public long getTotalBufferLimit() { 904 return totalBufferLimit; 905 } 906 907 /** 908 * Get the directory where wals are archived 909 * @return the directory where wals are archived 910 */ 911 public Path getOldLogDir() { 912 return this.oldLogDir; 913 } 914 915 /** 916 * Get the directory where wals are stored by their RSs 917 * @return the directory where wals are stored by their RSs 918 */ 919 public Path getLogDir() { 920 return this.logDir; 921 } 922 923 /** 924 * Get the handle on the local file system 925 * @return Handle on the local file system 926 */ 927 public FileSystem getFs() { 928 return this.fs; 929 } 930 931 /** 932 * Get the ReplicationPeers used by this ReplicationSourceManager 933 * @return the ReplicationPeers used by this ReplicationSourceManager 934 */ 935 public ReplicationPeers getReplicationPeers() { 936 return this.replicationPeers; 937 } 938 939 /** 940 * Get a string representation of all the sources' metrics 941 */ 942 public String getStats() { 943 StringBuilder stats = new StringBuilder(); 944 // Print stats that apply across all Replication Sources 945 stats.append("Global stats: "); 946 stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") 947 .append(getTotalBufferLimit()).append("B\n"); 948 for (ReplicationSourceInterface source : this.sources.values()) { 949 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 950 stats.append(source.getStats() + "\n"); 951 } 952 for (ReplicationSourceInterface oldSource : oldsources) { 953 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 954 stats.append(oldSource.getStats() + "\n"); 955 } 956 return stats.toString(); 957 } 958 959 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 960 throws IOException { 961 for (ReplicationSourceInterface source : this.sources.values()) { 962 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 963 } 964 } 965 966 public void cleanUpHFileRefs(String peerId, List<String> files) { 967 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 968 } 969 970 int activeFailoverTaskCount() { 971 return executor.getActiveCount(); 972 } 973 974 MetricsReplicationGlobalSourceSource getGlobalMetrics() { 975 return this.globalMetrics; 976 } 977 978 /** 979 * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. 980 * Create it once only. If exists already, use the existing one. 981 * @see #removeCatalogReplicationSource(RegionInfo) 982 * @see #addSource(String) This is specialization on the addSource method. 983 */ 984 public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo) 985 throws IOException { 986 // Poor-man's putIfAbsent 987 synchronized (this.catalogReplicationSource) { 988 ReplicationSourceInterface rs = this.catalogReplicationSource.get(); 989 return rs != null ? rs : 990 this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo)); 991 } 992 } 993 994 /** 995 * Remove the hbase:meta Catalog replication source. 996 * Called when we close hbase:meta. 997 * @see #addCatalogReplicationSource(RegionInfo regionInfo) 998 */ 999 public void removeCatalogReplicationSource(RegionInfo regionInfo) { 1000 // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region 1001 // comes back to this server. 1002 } 1003 1004 /** 1005 * Create, initialize, and start the Catalog ReplicationSource. 1006 * Presumes called one-time only (caller must ensure one-time only call). 1007 * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}. 1008 * @see #addSource(String) This is a specialization of the addSource call. 1009 * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on 1010 * why the special handling). 1011 */ 1012 private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo) 1013 throws IOException { 1014 // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the 1015 // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider. 1016 WALProvider walProvider = this.walFactory.getMetaWALProvider(); 1017 boolean instantiate = walProvider == null; 1018 if (instantiate) { 1019 walProvider = this.walFactory.getMetaProvider(); 1020 } 1021 // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need 1022 // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog 1023 // read replicas feature that makes use of the source does a reset on a crash of the WAL 1024 // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the 1025 // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail. 1026 CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf, 1027 this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER); 1028 final ReplicationSourceInterface crs = new CatalogReplicationSource(); 1029 crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), 1030 clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); 1031 // Add listener on the provider so we can pick up the WAL to replicate on roll. 1032 WALActionsListener listener = new WALActionsListener() { 1033 @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException { 1034 crs.enqueueLog(newPath); 1035 } 1036 }; 1037 walProvider.addWALActionsListener(listener); 1038 if (!instantiate) { 1039 // If we did not instantiate provider, need to add our listener on already-created WAL 1040 // instance too (listeners are passed by provider to WAL instance on creation but if provider 1041 // created already, our listener add above is missed). And add the current WAL file to the 1042 // Replication Source so it can start replicating it. 1043 WAL wal = walProvider.getWAL(regionInfo); 1044 wal.registerWALActionsListener(listener); 1045 crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName()); 1046 } 1047 return crs.startup(); 1048 } 1049}