001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.replication.regionserver; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.TreeSet; 033import java.util.UUID; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.CopyOnWriteArrayList; 036import java.util.concurrent.LinkedBlockingQueue; 037import java.util.concurrent.RejectedExecutionException; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicLong; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.TableDescriptors; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.regionserver.HRegionServer; 050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 051import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 052import org.apache.hadoop.hbase.replication.ReplicationException; 053import org.apache.hadoop.hbase.replication.ReplicationListener; 054import org.apache.hadoop.hbase.replication.ReplicationPeer; 055import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 056import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 057import org.apache.hadoop.hbase.replication.ReplicationPeers; 058import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 059import org.apache.hadoop.hbase.replication.ReplicationQueues; 060import org.apache.hadoop.hbase.replication.ReplicationTracker; 061import org.apache.hadoop.hbase.util.Pair; 062import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 069 070/** 071 * This class is responsible to manage all the replication 072 * sources. There are two classes of sources: 073 * <ul> 074 * <li> Normal sources are persistent and one per peer cluster</li> 075 * <li> Old sources are recovered from a failed region server and our 076 * only goal is to finish replicating the WAL queue it had up in ZK</li> 077 * </ul> 078 * 079 * When a region server dies, this class uses a watcher to get notified and it 080 * tries to grab a lock in order to transfer all the queues in a local 081 * old source. 082 * 083 * This class implements the ReplicationListener interface so that it can track changes in 084 * replication state. 085 */ 086@InterfaceAudience.Private 087public class ReplicationSourceManager implements ReplicationListener { 088 private static final Logger LOG = 089 LoggerFactory.getLogger(ReplicationSourceManager.class); 090 // List of all the sources that read this RS's logs 091 private final List<ReplicationSourceInterface> sources; 092 // List of all the sources we got from died RSs 093 private final List<ReplicationSourceInterface> oldsources; 094 private final ReplicationQueues replicationQueues; 095 private final ReplicationTracker replicationTracker; 096 private final ReplicationPeers replicationPeers; 097 // UUID for this cluster 098 private final UUID clusterId; 099 // All about stopping 100 private final Server server; 101 // All logs we are currently tracking 102 // Index structure of the map is: peer_id->logPrefix/logGroup->logs 103 private final Map<String, Map<String, SortedSet<String>>> walsById; 104 // Logs for recovered sources we are currently tracking 105 private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues; 106 private final Configuration conf; 107 private final FileSystem fs; 108 // The paths to the latest log of each wal group, for new coming peers 109 private final Set<Path> latestPaths; 110 // Path to the wals directories 111 private final Path logDir; 112 // Path to the wal archive 113 private final Path oldLogDir; 114 private final WALFileLengthProvider walFileLengthProvider; 115 // The number of ms that we wait before moving znodes, HBASE-3596 116 private final long sleepBeforeFailover; 117 // Homemade executer service for replication 118 private final ThreadPoolExecutor executor; 119 120 private final boolean replicationForBulkLoadDataEnabled; 121 122 123 private AtomicLong totalBufferUsed = new AtomicLong(); 124 125 /** 126 * Creates a replication manager and sets the watch on all the other registered region servers 127 * @param replicationQueues the interface for manipulating replication queues 128 * @param replicationPeers 129 * @param replicationTracker 130 * @param conf the configuration to use 131 * @param server the server for this region server 132 * @param fs the file system to use 133 * @param logDir the directory that contains all wal directories of live RSs 134 * @param oldLogDir the directory where old logs are archived 135 * @param clusterId 136 */ 137 public ReplicationSourceManager(ReplicationQueues replicationQueues, 138 ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, 139 Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, 140 WALFileLengthProvider walFileLengthProvider) throws IOException { 141 //CopyOnWriteArrayList is thread-safe. 142 //Generally, reading is more than modifying. 143 this.sources = new CopyOnWriteArrayList<>(); 144 this.replicationQueues = replicationQueues; 145 this.replicationPeers = replicationPeers; 146 this.replicationTracker = replicationTracker; 147 this.server = server; 148 this.walsById = new HashMap<>(); 149 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 150 this.oldsources = new CopyOnWriteArrayList<>(); 151 this.conf = conf; 152 this.fs = fs; 153 this.logDir = logDir; 154 this.oldLogDir = oldLogDir; 155 this.sleepBeforeFailover = 156 conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds 157 this.clusterId = clusterId; 158 this.walFileLengthProvider = walFileLengthProvider; 159 this.replicationTracker.registerListener(this); 160 this.replicationPeers.getAllPeerIds(); 161 // It's preferable to failover 1 RS at a time, but with good zk servers 162 // more could be processed at the same time. 163 int nbWorkers = conf.getInt("replication.executor.workers", 1); 164 // use a short 100ms sleep since this could be done inline with a RS startup 165 // even if we fail, other region servers can take care of it 166 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 167 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 168 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 169 tfb.setNameFormat("ReplicationExecutor-%d"); 170 tfb.setDaemon(true); 171 this.executor.setThreadFactory(tfb.build()); 172 this.latestPaths = new HashSet<Path>(); 173 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 174 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 175 } 176 177 /** 178 * Provide the id of the peer and a log key and this method will figure which 179 * wal it belongs to and will log, for this region server, the current 180 * position. It will also clean old logs from the queue. 181 * @param log Path to the log currently being replicated from 182 * replication status in zookeeper. It will also delete older entries. 183 * @param id id of the peer cluster 184 * @param position current location in the log 185 * @param queueRecovered indicates if this queue comes from another region server 186 * @param holdLogInZK if true then the log is retained in ZK 187 */ 188 public void logPositionAndCleanOldLogs(Path log, String id, long position, 189 boolean queueRecovered, boolean holdLogInZK) { 190 String fileName = log.getName(); 191 this.replicationQueues.setLogPosition(id, fileName, position); 192 if (holdLogInZK) { 193 return; 194 } 195 cleanOldLogs(fileName, id, queueRecovered); 196 } 197 198 /** 199 * Cleans a log file and all older files from ZK. Called when we are sure that a 200 * log file is closed and has no more entries. 201 * @param key Path to the log 202 * @param id id of the peer cluster 203 * @param queueRecovered Whether this is a recovered queue 204 */ 205 public void cleanOldLogs(String key, String id, boolean queueRecovered) { 206 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key); 207 if (queueRecovered) { 208 SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix); 209 if (wals != null && !wals.first().equals(key)) { 210 cleanOldLogs(wals, key, id); 211 } 212 } else { 213 synchronized (this.walsById) { 214 SortedSet<String> wals = walsById.get(id).get(logPrefix); 215 if (wals != null && !wals.first().equals(key)) { 216 cleanOldLogs(wals, key, id); 217 } 218 } 219 } 220 } 221 222 private void cleanOldLogs(SortedSet<String> wals, String key, String id) { 223 SortedSet<String> walSet = wals.headSet(key); 224 LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); 225 for (String wal : walSet) { 226 this.replicationQueues.removeLog(id, wal); 227 } 228 walSet.clear(); 229 } 230 231 /** 232 * Adds a normal source per registered peer cluster and tries to process all 233 * old region server wal queues 234 */ 235 void init() throws IOException, ReplicationException { 236 for (String id : this.replicationPeers.getConnectedPeerIds()) { 237 addSource(id); 238 if (replicationForBulkLoadDataEnabled) { 239 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 240 // when a peer was added before replication for bulk loaded data was enabled. 241 this.replicationQueues.addPeerToHFileRefs(id); 242 } 243 } 244 AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker(); 245 try { 246 this.executor.execute(adoptionWorker); 247 } catch (RejectedExecutionException ex) { 248 LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage()); 249 } 250 } 251 252 /** 253 * Add sources for the given peer cluster on this region server. For the newly added peer, we only 254 * need to enqueue the latest log of each wal group and do replication 255 * @param id the id of the peer cluster 256 * @return the source that was created 257 * @throws IOException 258 */ 259 @VisibleForTesting 260 ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { 261 ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); 262 ReplicationPeer peer = replicationPeers.getConnectedPeer(id); 263 ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, 264 this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer, 265 walFileLengthProvider); 266 synchronized (this.walsById) { 267 this.sources.add(src); 268 Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); 269 this.walsById.put(id, walsByGroup); 270 // Add the latest wal to that source's queue 271 synchronized (latestPaths) { 272 if (this.latestPaths.size() > 0) { 273 for (Path logPath : latestPaths) { 274 String name = logPath.getName(); 275 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); 276 SortedSet<String> logs = new TreeSet<>(); 277 logs.add(name); 278 walsByGroup.put(walPrefix, logs); 279 try { 280 this.replicationQueues.addLog(id, name); 281 } catch (ReplicationException e) { 282 String message = 283 "Cannot add log to queue when creating a new source, queueId=" + id 284 + ", filename=" + name; 285 server.stop(message); 286 throw e; 287 } 288 src.enqueueLog(logPath); 289 } 290 } 291 } 292 } 293 src.startup(); 294 return src; 295 } 296 297 @VisibleForTesting 298 int getSizeOfLatestPath() { 299 synchronized (latestPaths) { 300 return latestPaths.size(); 301 } 302 } 303 304 /** 305 * Delete a complete queue of wals associated with a peer cluster 306 * @param peerId Id of the peer cluster queue of wals to delete 307 */ 308 public void deleteSource(String peerId, boolean closeConnection) { 309 this.replicationQueues.removeQueue(peerId); 310 if (closeConnection) { 311 this.replicationPeers.peerDisconnected(peerId); 312 } 313 } 314 315 /** 316 * Terminate the replication on this region server 317 */ 318 public void join() { 319 this.executor.shutdown(); 320 for (ReplicationSourceInterface source : this.sources) { 321 source.terminate("Region server is closing"); 322 } 323 } 324 325 /** 326 * Get a copy of the wals of the first source on this rs 327 * @return a sorted set of wal names 328 */ 329 @VisibleForTesting 330 Map<String, Map<String, SortedSet<String>>> getWALs() { 331 return Collections.unmodifiableMap(walsById); 332 } 333 334 /** 335 * Get a copy of the wals of the recovered sources on this rs 336 * @return a sorted set of wal names 337 */ 338 @VisibleForTesting 339 Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() { 340 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 341 } 342 343 /** 344 * Get a list of all the normal sources of this rs 345 * @return lis of all sources 346 */ 347 public List<ReplicationSourceInterface> getSources() { 348 return this.sources; 349 } 350 351 /** 352 * Get a list of all the old sources of this rs 353 * @return list of all old sources 354 */ 355 public List<ReplicationSourceInterface> getOldSources() { 356 return this.oldsources; 357 } 358 359 /** 360 * Get the normal source for a given peer 361 * @param peerId 362 * @return the normal source for the give peer if it exists, otherwise null. 363 */ 364 public ReplicationSourceInterface getSource(String peerId) { 365 return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null); 366 } 367 368 @VisibleForTesting 369 List<String> getAllQueues() { 370 return replicationQueues.getAllQueues(); 371 } 372 373 // public because of we call it in TestReplicationEmptyWALRecovery 374 @VisibleForTesting 375 public void preLogRoll(Path newLog) throws IOException { 376 recordLog(newLog); 377 String logName = newLog.getName(); 378 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 379 synchronized (latestPaths) { 380 Iterator<Path> iterator = latestPaths.iterator(); 381 while (iterator.hasNext()) { 382 Path path = iterator.next(); 383 if (path.getName().contains(logPrefix)) { 384 iterator.remove(); 385 break; 386 } 387 } 388 this.latestPaths.add(newLog); 389 } 390 } 391 392 /** 393 * Check and enqueue the given log to the correct source. If there's still no source for the 394 * group to which the given log belongs, create one 395 * @param logPath the log path to check and enqueue 396 * @throws IOException 397 */ 398 private void recordLog(Path logPath) throws IOException { 399 String logName = logPath.getName(); 400 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 401 // update replication queues on ZK 402 // synchronize on replicationPeers to avoid adding source for the to-be-removed peer 403 synchronized (replicationPeers) { 404 for (String id : replicationPeers.getConnectedPeerIds()) { 405 try { 406 this.replicationQueues.addLog(id, logName); 407 } catch (ReplicationException e) { 408 throw new IOException("Cannot add log to replication queue" 409 + " when creating a new source, queueId=" + id + ", filename=" + logName, e); 410 } 411 } 412 } 413 // update walsById map 414 synchronized (walsById) { 415 for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) { 416 String peerId = entry.getKey(); 417 Map<String, SortedSet<String>> walsByPrefix = entry.getValue(); 418 boolean existingPrefix = false; 419 for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) { 420 SortedSet<String> wals = walsEntry.getValue(); 421 if (this.sources.isEmpty()) { 422 // If there's no slaves, don't need to keep the old wals since 423 // we only consider the last one when a new slave comes in 424 wals.clear(); 425 } 426 if (logPrefix.equals(walsEntry.getKey())) { 427 wals.add(logName); 428 existingPrefix = true; 429 } 430 } 431 if (!existingPrefix) { 432 // The new log belongs to a new group, add it into this peer 433 LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId); 434 SortedSet<String> wals = new TreeSet<>(); 435 wals.add(logName); 436 walsByPrefix.put(logPrefix, wals); 437 } 438 } 439 } 440 } 441 442 // public because of we call it in TestReplicationEmptyWALRecovery 443 @VisibleForTesting 444 public void postLogRoll(Path newLog) throws IOException { 445 // This only updates the sources we own, not the recovered ones 446 for (ReplicationSourceInterface source : this.sources) { 447 source.enqueueLog(newLog); 448 } 449 } 450 451 @VisibleForTesting 452 public AtomicLong getTotalBufferUsed() { 453 return totalBufferUsed; 454 } 455 456 /** 457 * Factory method to create a replication source 458 * @param conf the configuration to use 459 * @param fs the file system to use 460 * @param manager the manager to use 461 * @param server the server object for this region server 462 * @param peerId the id of the peer cluster 463 * @return the created source 464 * @throws IOException 465 */ 466 private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, 467 ReplicationSourceManager manager, ReplicationQueues replicationQueues, 468 ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, 469 ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer, 470 WALFileLengthProvider walFileLengthProvider) throws IOException { 471 RegionServerCoprocessorHost rsServerHost = null; 472 TableDescriptors tableDescriptors = null; 473 if (server instanceof HRegionServer) { 474 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 475 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 476 } 477 478 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId); 479 480 ReplicationEndpoint replicationEndpoint = null; 481 try { 482 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 483 if (replicationEndpointImpl == null) { 484 // Default to HBase inter-cluster replication endpoint 485 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); 486 } 487 Class<?> c = Class.forName(replicationEndpointImpl); 488 replicationEndpoint = (ReplicationEndpoint) c.getDeclaredConstructor().newInstance(); 489 if(rsServerHost != null) { 490 ReplicationEndpoint newReplicationEndPoint = rsServerHost 491 .postCreateReplicationEndPoint(replicationEndpoint); 492 if(newReplicationEndPoint != null) { 493 // Override the newly created endpoint from the hook with configured end point 494 replicationEndpoint = newReplicationEndPoint; 495 } 496 } 497 } catch (Exception e) { 498 LOG.warn("Passed replication endpoint implementation throws errors" 499 + " while initializing ReplicationSource for peer: " + peerId, e); 500 throw new IOException(e); 501 } 502 503 MetricsSource metrics = new MetricsSource(peerId); 504 // init replication source 505 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, 506 replicationEndpoint, walFileLengthProvider, metrics); 507 508 // init replication endpoint 509 replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), 510 fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); 511 512 return src; 513 } 514 515 /** 516 * Transfer all the queues of the specified to this region server. 517 * First it tries to grab a lock and if it works it will move the 518 * znodes and finally will delete the old znodes. 519 * 520 * It creates one old source for any type of source of the old rs. 521 * @param rsZnode 522 */ 523 private void transferQueues(String rsZnode) { 524 NodeFailoverWorker transfer = 525 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, 526 this.clusterId); 527 try { 528 this.executor.execute(transfer); 529 } catch (RejectedExecutionException ex) { 530 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()); 531 } 532 } 533 534 /** 535 * Clear the references to the specified old source 536 * @param src source to clear 537 */ 538 public void closeRecoveredQueue(ReplicationSourceInterface src) { 539 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); 540 if (src instanceof ReplicationSource) { 541 ((ReplicationSource) src).getSourceMetrics().clear(); 542 } 543 this.oldsources.remove(src); 544 deleteSource(src.getPeerClusterZnode(), false); 545 this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); 546 } 547 548 /** 549 * Clear the references to the specified old source 550 * @param src source to clear 551 */ 552 public void closeQueue(ReplicationSourceInterface src) { 553 LOG.info("Done with the queue " + src.getPeerClusterZnode()); 554 src.getSourceMetrics().clear(); 555 this.sources.remove(src); 556 deleteSource(src.getPeerClusterZnode(), true); 557 this.walsById.remove(src.getPeerClusterZnode()); 558 } 559 560 /** 561 * Thie method first deletes all the recovered sources for the specified 562 * id, then deletes the normal source (deleting all related data in ZK). 563 * @param id The id of the peer cluster 564 */ 565 public void removePeer(String id) { 566 LOG.info("Closing the following queue " + id + ", currently have " 567 + sources.size() + " and another " 568 + oldsources.size() + " that were recovered"); 569 String terminateMessage = "Replication stream was removed by a user"; 570 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 571 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 572 // see NodeFailoverWorker.run 573 synchronized (oldsources) { 574 // First close all the recovered sources for this peer 575 for (ReplicationSourceInterface src : oldsources) { 576 if (id.equals(src.getPeerId())) { 577 oldSourcesToDelete.add(src); 578 } 579 } 580 for (ReplicationSourceInterface src : oldSourcesToDelete) { 581 src.terminate(terminateMessage); 582 closeRecoveredQueue(src); 583 } 584 } 585 LOG.info("Number of deleted recovered sources for " + id + ": " 586 + oldSourcesToDelete.size()); 587 // Now look for the one on this cluster 588 List<ReplicationSourceInterface> srcToRemove = new ArrayList<>(); 589 // synchronize on replicationPeers to avoid adding source for the to-be-removed peer 590 synchronized (this.replicationPeers) { 591 for (ReplicationSourceInterface src : this.sources) { 592 if (id.equals(src.getPeerId())) { 593 srcToRemove.add(src); 594 } 595 } 596 if (srcToRemove.isEmpty()) { 597 LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + 598 "This could mean that ReplicationSourceInterface initialization failed for this peer " + 599 "and that replication on this peer may not be caught up. peerId=" + id); 600 } 601 for (ReplicationSourceInterface toRemove : srcToRemove) { 602 toRemove.terminate(terminateMessage); 603 closeQueue(toRemove); 604 } 605 deleteSource(id, true); 606 } 607 } 608 609 @Override 610 public void regionServerRemoved(String regionserver) { 611 transferQueues(regionserver); 612 } 613 614 @Override 615 public void peerRemoved(String peerId) { 616 removePeer(peerId); 617 this.replicationQueues.removePeerFromHFileRefs(peerId); 618 } 619 620 @Override 621 public void peerListChanged(List<String> peerIds) { 622 for (String id : peerIds) { 623 try { 624 boolean added = this.replicationPeers.peerConnected(id); 625 if (added) { 626 addSource(id); 627 if (replicationForBulkLoadDataEnabled) { 628 this.replicationQueues.addPeerToHFileRefs(id); 629 } 630 } 631 } catch (Exception e) { 632 LOG.error("Error while adding a new peer", e); 633 } 634 } 635 } 636 637 /** 638 * Class responsible to setup new ReplicationSources to take care of the 639 * queues from dead region servers. 640 */ 641 class NodeFailoverWorker extends Thread { 642 643 private String rsZnode; 644 private final ReplicationQueues rq; 645 private final ReplicationPeers rp; 646 private final UUID clusterId; 647 648 /** 649 * @param rsZnode 650 */ 651 public NodeFailoverWorker(String rsZnode) { 652 this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId); 653 } 654 655 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, 656 final ReplicationPeers replicationPeers, final UUID clusterId) { 657 super("Failover-for-"+rsZnode); 658 this.rsZnode = rsZnode; 659 this.rq = replicationQueues; 660 this.rp = replicationPeers; 661 this.clusterId = clusterId; 662 } 663 664 @Override 665 public void run() { 666 if (this.rq.isThisOurRegionServer(rsZnode)) { 667 return; 668 } 669 // Wait a bit before transferring the queues, we may be shutting down. 670 // This sleep may not be enough in some cases. 671 try { 672 Thread.sleep(sleepBeforeFailover + 673 (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 674 } catch (InterruptedException e) { 675 LOG.warn("Interrupted while waiting before transferring a queue."); 676 Thread.currentThread().interrupt(); 677 } 678 // We try to lock that rs' queue directory 679 if (server.isStopped()) { 680 LOG.info("Not transferring queue since we are shutting down"); 681 return; 682 } 683 Map<String, Set<String>> newQueues = new HashMap<>(); 684 List<String> peers = rq.getUnClaimedQueueIds(rsZnode); 685 while (peers != null && !peers.isEmpty()) { 686 Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode, 687 peers.get(ThreadLocalRandom.current().nextInt(peers.size()))); 688 long sleep = sleepBeforeFailover/2; 689 if (peer != null) { 690 newQueues.put(peer.getFirst(), peer.getSecond()); 691 sleep = sleepBeforeFailover; 692 } 693 try { 694 Thread.sleep(sleep); 695 } catch (InterruptedException e) { 696 LOG.warn("Interrupted while waiting before transferring a queue."); 697 Thread.currentThread().interrupt(); 698 } 699 peers = rq.getUnClaimedQueueIds(rsZnode); 700 } 701 if (peers != null) { 702 rq.removeReplicatorIfQueueIsEmpty(rsZnode); 703 } 704 // Copying over the failed queue is completed. 705 if (newQueues.isEmpty()) { 706 // We either didn't get the lock or the failed region server didn't have any outstanding 707 // WALs to replicate, so we are done. 708 return; 709 } 710 711 for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) { 712 String peerId = entry.getKey(); 713 Set<String> walsSet = entry.getValue(); 714 try { 715 // there is not an actual peer defined corresponding to peerId for the failover. 716 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); 717 String actualPeerId = replicationQueueInfo.getPeerId(); 718 ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId); 719 ReplicationPeerConfig peerConfig = null; 720 try { 721 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId); 722 } catch (ReplicationException ex) { 723 LOG.warn("Received exception while getting replication peer config, skipping replay" 724 + ex); 725 } 726 if (peer == null || peerConfig == null) { 727 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode); 728 replicationQueues.removeQueue(peerId); 729 continue; 730 } 731 if (server instanceof ReplicationSyncUp.DummyServer 732 && peer.getPeerState().equals(PeerState.DISABLED)) { 733 LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip " 734 + "replicating data to this peer.", 735 actualPeerId); 736 continue; 737 } 738 // track sources in walsByIdRecoveredQueues 739 Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); 740 walsByIdRecoveredQueues.put(peerId, walsByGroup); 741 for (String wal : walsSet) { 742 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 743 SortedSet<String> wals = walsByGroup.get(walPrefix); 744 if (wals == null) { 745 wals = new TreeSet<>(); 746 walsByGroup.put(walPrefix, wals); 747 } 748 wals.add(wal); 749 } 750 751 // enqueue sources 752 ReplicationSourceInterface src = 753 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp, 754 server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider); 755 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 756 // see removePeer 757 synchronized (oldsources) { 758 if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) { 759 src.terminate("Recovered queue doesn't belong to any current peer"); 760 closeRecoveredQueue(src); 761 continue; 762 } 763 oldsources.add(src); 764 for (String wal : walsSet) { 765 src.enqueueLog(new Path(oldLogDir, wal)); 766 } 767 src.startup(); 768 } 769 } catch (IOException e) { 770 // TODO manage it 771 LOG.error("Failed creating a source", e); 772 } 773 } 774 } 775 } 776 777 class AdoptAbandonedQueuesWorker extends Thread{ 778 779 public AdoptAbandonedQueuesWorker() {} 780 781 @Override 782 public void run() { 783 List<String> currentReplicators = null; 784 try { 785 currentReplicators = replicationQueues.getListOfReplicators(); 786 } catch (ReplicationException e) { 787 server.abort("Failed to get all replicators", e); 788 return; 789 } 790 if (currentReplicators == null || currentReplicators.isEmpty()) { 791 return; 792 } 793 List<String> otherRegionServers = replicationTracker.getListOfRegionServers(); 794 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " 795 + otherRegionServers); 796 797 // Look if there's anything to process after a restart 798 for (String rs : currentReplicators) { 799 if (!otherRegionServers.contains(rs)) { 800 transferQueues(rs); 801 } 802 } 803 } 804 } 805 806 /** 807 * Get the directory where wals are archived 808 * @return the directory where wals are archived 809 */ 810 public Path getOldLogDir() { 811 return this.oldLogDir; 812 } 813 814 /** 815 * Get the directory where wals are stored by their RSs 816 * @return the directory where wals are stored by their RSs 817 */ 818 public Path getLogDir() { 819 return this.logDir; 820 } 821 822 /** 823 * Get the handle on the local file system 824 * @return Handle on the local file system 825 */ 826 public FileSystem getFs() { 827 return this.fs; 828 } 829 830 /** 831 * Get the ReplicationPeers used by this ReplicationSourceManager 832 * @return the ReplicationPeers used by this ReplicationSourceManager 833 */ 834 public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} 835 836 /** 837 * Get a string representation of all the sources' metrics 838 */ 839 public String getStats() { 840 StringBuilder stats = new StringBuilder(); 841 for (ReplicationSourceInterface source : sources) { 842 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 843 stats.append(source.getStats() + "\n"); 844 } 845 for (ReplicationSourceInterface oldSource : oldsources) { 846 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": "); 847 stats.append(oldSource.getStats()+ "\n"); 848 } 849 return stats.toString(); 850 } 851 852 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 853 throws ReplicationException { 854 for (ReplicationSourceInterface source : this.sources) { 855 source.addHFileRefs(tableName, family, pairs); 856 } 857 } 858 859 public void cleanUpHFileRefs(String peerId, List<String> files) { 860 this.replicationQueues.removeHFileRefs(peerId, files); 861 } 862}