001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.replication.regionserver;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.TreeSet;
033import java.util.UUID;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.CopyOnWriteArrayList;
036import java.util.concurrent.LinkedBlockingQueue;
037import java.util.concurrent.RejectedExecutionException;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicLong;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
051import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.ReplicationException;
053import org.apache.hadoop.hbase.replication.ReplicationListener;
054import org.apache.hadoop.hbase.replication.ReplicationPeer;
055import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
056import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
057import org.apache.hadoop.hbase.replication.ReplicationPeers;
058import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
059import org.apache.hadoop.hbase.replication.ReplicationQueues;
060import org.apache.hadoop.hbase.replication.ReplicationTracker;
061import org.apache.hadoop.hbase.util.Pair;
062import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
068import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
069
070/**
071 * This class is responsible to manage all the replication
072 * sources. There are two classes of sources:
073 * <ul>
074 * <li> Normal sources are persistent and one per peer cluster</li>
075 * <li> Old sources are recovered from a failed region server and our
076 * only goal is to finish replicating the WAL queue it had up in ZK</li>
077 * </ul>
078 *
079 * When a region server dies, this class uses a watcher to get notified and it
080 * tries to grab a lock in order to transfer all the queues in a local
081 * old source.
082 *
083 * This class implements the ReplicationListener interface so that it can track changes in
084 * replication state.
085 */
086@InterfaceAudience.Private
087public class ReplicationSourceManager implements ReplicationListener {
088  private static final Logger LOG =
089      LoggerFactory.getLogger(ReplicationSourceManager.class);
090  // List of all the sources that read this RS's logs
091  private final List<ReplicationSourceInterface> sources;
092  // List of all the sources we got from died RSs
093  private final List<ReplicationSourceInterface> oldsources;
094  private final ReplicationQueues replicationQueues;
095  private final ReplicationTracker replicationTracker;
096  private final ReplicationPeers replicationPeers;
097  // UUID for this cluster
098  private final UUID clusterId;
099  // All about stopping
100  private final Server server;
101  // All logs we are currently tracking
102  // Index structure of the map is: peer_id->logPrefix/logGroup->logs
103  private final Map<String, Map<String, SortedSet<String>>> walsById;
104  // Logs for recovered sources we are currently tracking
105  private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
106  private final Configuration conf;
107  private final FileSystem fs;
108  // The paths to the latest log of each wal group, for new coming peers
109  private final Set<Path> latestPaths;
110  // Path to the wals directories
111  private final Path logDir;
112  // Path to the wal archive
113  private final Path oldLogDir;
114  private final WALFileLengthProvider walFileLengthProvider;
115  // The number of ms that we wait before moving znodes, HBASE-3596
116  private final long sleepBeforeFailover;
117  // Homemade executer service for replication
118  private final ThreadPoolExecutor executor;
119
120  private final boolean replicationForBulkLoadDataEnabled;
121
122
123  private AtomicLong totalBufferUsed = new AtomicLong();
124
125  /**
126   * Creates a replication manager and sets the watch on all the other registered region servers
127   * @param replicationQueues the interface for manipulating replication queues
128   * @param replicationPeers
129   * @param replicationTracker
130   * @param conf the configuration to use
131   * @param server the server for this region server
132   * @param fs the file system to use
133   * @param logDir the directory that contains all wal directories of live RSs
134   * @param oldLogDir the directory where old logs are archived
135   * @param clusterId
136   */
137  public ReplicationSourceManager(ReplicationQueues replicationQueues,
138      ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
139      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
140      WALFileLengthProvider walFileLengthProvider) throws IOException {
141    //CopyOnWriteArrayList is thread-safe.
142    //Generally, reading is more than modifying.
143    this.sources = new CopyOnWriteArrayList<>();
144    this.replicationQueues = replicationQueues;
145    this.replicationPeers = replicationPeers;
146    this.replicationTracker = replicationTracker;
147    this.server = server;
148    this.walsById = new HashMap<>();
149    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
150    this.oldsources = new CopyOnWriteArrayList<>();
151    this.conf = conf;
152    this.fs = fs;
153    this.logDir = logDir;
154    this.oldLogDir = oldLogDir;
155    this.sleepBeforeFailover =
156        conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
157    this.clusterId = clusterId;
158    this.walFileLengthProvider = walFileLengthProvider;
159    this.replicationTracker.registerListener(this);
160    this.replicationPeers.getAllPeerIds();
161    // It's preferable to failover 1 RS at a time, but with good zk servers
162    // more could be processed at the same time.
163    int nbWorkers = conf.getInt("replication.executor.workers", 1);
164    // use a short 100ms sleep since this could be done inline with a RS startup
165    // even if we fail, other region servers can take care of it
166    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
167        100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
168    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
169    tfb.setNameFormat("ReplicationExecutor-%d");
170    tfb.setDaemon(true);
171    this.executor.setThreadFactory(tfb.build());
172    this.latestPaths = new HashSet<Path>();
173    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
174      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
175  }
176
177  /**
178   * Provide the id of the peer and a log key and this method will figure which
179   * wal it belongs to and will log, for this region server, the current
180   * position. It will also clean old logs from the queue.
181   * @param log Path to the log currently being replicated from
182   * replication status in zookeeper. It will also delete older entries.
183   * @param id id of the peer cluster
184   * @param position current location in the log
185   * @param queueRecovered indicates if this queue comes from another region server
186   * @param holdLogInZK if true then the log is retained in ZK
187   */
188  public void logPositionAndCleanOldLogs(Path log, String id, long position,
189      boolean queueRecovered, boolean holdLogInZK) {
190    String fileName = log.getName();
191    this.replicationQueues.setLogPosition(id, fileName, position);
192    if (holdLogInZK) {
193     return;
194    }
195    cleanOldLogs(fileName, id, queueRecovered);
196  }
197
198  /**
199   * Cleans a log file and all older files from ZK. Called when we are sure that a
200   * log file is closed and has no more entries.
201   * @param key Path to the log
202   * @param id id of the peer cluster
203   * @param queueRecovered Whether this is a recovered queue
204   */
205  public void cleanOldLogs(String key, String id, boolean queueRecovered) {
206    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
207    if (queueRecovered) {
208      SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
209      if (wals != null && !wals.first().equals(key)) {
210        cleanOldLogs(wals, key, id);
211      }
212    } else {
213      synchronized (this.walsById) {
214        SortedSet<String> wals = walsById.get(id).get(logPrefix);
215        if (wals != null && !wals.first().equals(key)) {
216          cleanOldLogs(wals, key, id);
217        }
218      }
219    }
220 }
221
222  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
223    SortedSet<String> walSet = wals.headSet(key);
224    LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
225    for (String wal : walSet) {
226      this.replicationQueues.removeLog(id, wal);
227    }
228    walSet.clear();
229  }
230
231  /**
232   * Adds a normal source per registered peer cluster and tries to process all
233   * old region server wal queues
234   */
235  void init() throws IOException, ReplicationException {
236    for (String id : this.replicationPeers.getConnectedPeerIds()) {
237      addSource(id);
238      if (replicationForBulkLoadDataEnabled) {
239        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
240        // when a peer was added before replication for bulk loaded data was enabled.
241        this.replicationQueues.addPeerToHFileRefs(id);
242      }
243    }
244    AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
245    try {
246      this.executor.execute(adoptionWorker);
247    } catch (RejectedExecutionException ex) {
248      LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
249    }
250  }
251
252  /**
253   * Add sources for the given peer cluster on this region server. For the newly added peer, we only
254   * need to enqueue the latest log of each wal group and do replication
255   * @param id the id of the peer cluster
256   * @return the source that was created
257   * @throws IOException
258   */
259  @VisibleForTesting
260  ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
261    ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
262    ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
263    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
264      this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
265      walFileLengthProvider);
266    synchronized (this.walsById) {
267      this.sources.add(src);
268      Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
269      this.walsById.put(id, walsByGroup);
270      // Add the latest wal to that source's queue
271      synchronized (latestPaths) {
272        if (this.latestPaths.size() > 0) {
273          for (Path logPath : latestPaths) {
274            String name = logPath.getName();
275            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
276            SortedSet<String> logs = new TreeSet<>();
277            logs.add(name);
278            walsByGroup.put(walPrefix, logs);
279            try {
280              this.replicationQueues.addLog(id, name);
281            } catch (ReplicationException e) {
282              String message =
283                  "Cannot add log to queue when creating a new source, queueId=" + id
284                      + ", filename=" + name;
285              server.stop(message);
286              throw e;
287            }
288            src.enqueueLog(logPath);
289          }
290        }
291      }
292    }
293    src.startup();
294    return src;
295  }
296
297  @VisibleForTesting
298  int getSizeOfLatestPath() {
299    synchronized (latestPaths) {
300      return latestPaths.size();
301    }
302  }
303
304  /**
305   * Delete a complete queue of wals associated with a peer cluster
306   * @param peerId Id of the peer cluster queue of wals to delete
307   */
308  public void deleteSource(String peerId, boolean closeConnection) {
309    this.replicationQueues.removeQueue(peerId);
310    if (closeConnection) {
311      this.replicationPeers.peerDisconnected(peerId);
312    }
313  }
314
315  /**
316   * Terminate the replication on this region server
317   */
318  public void join() {
319    this.executor.shutdown();
320    for (ReplicationSourceInterface source : this.sources) {
321      source.terminate("Region server is closing");
322    }
323  }
324
325  /**
326   * Get a copy of the wals of the first source on this rs
327   * @return a sorted set of wal names
328   */
329  @VisibleForTesting
330  Map<String, Map<String, SortedSet<String>>> getWALs() {
331    return Collections.unmodifiableMap(walsById);
332  }
333
334  /**
335   * Get a copy of the wals of the recovered sources on this rs
336   * @return a sorted set of wal names
337   */
338  @VisibleForTesting
339  Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
340    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
341  }
342
343  /**
344   * Get a list of all the normal sources of this rs
345   * @return lis of all sources
346   */
347  public List<ReplicationSourceInterface> getSources() {
348    return this.sources;
349  }
350
351  /**
352   * Get a list of all the old sources of this rs
353   * @return list of all old sources
354   */
355  public List<ReplicationSourceInterface> getOldSources() {
356    return this.oldsources;
357  }
358
359  /**
360   * Get the normal source for a given peer
361   * @param peerId
362   * @return the normal source for the give peer if it exists, otherwise null.
363   */
364  public ReplicationSourceInterface getSource(String peerId) {
365    return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
366  }
367
368  @VisibleForTesting
369  List<String> getAllQueues() {
370    return replicationQueues.getAllQueues();
371  }
372
373  // public because of we call it in TestReplicationEmptyWALRecovery
374  @VisibleForTesting
375  public void preLogRoll(Path newLog) throws IOException {
376    recordLog(newLog);
377    String logName = newLog.getName();
378    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
379    synchronized (latestPaths) {
380      Iterator<Path> iterator = latestPaths.iterator();
381      while (iterator.hasNext()) {
382        Path path = iterator.next();
383        if (path.getName().contains(logPrefix)) {
384          iterator.remove();
385          break;
386        }
387      }
388      this.latestPaths.add(newLog);
389    }
390  }
391
392  /**
393   * Check and enqueue the given log to the correct source. If there's still no source for the
394   * group to which the given log belongs, create one
395   * @param logPath the log path to check and enqueue
396   * @throws IOException
397   */
398  private void recordLog(Path logPath) throws IOException {
399    String logName = logPath.getName();
400    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
401    // update replication queues on ZK
402    // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
403    synchronized (replicationPeers) {
404      for (String id : replicationPeers.getConnectedPeerIds()) {
405        try {
406          this.replicationQueues.addLog(id, logName);
407        } catch (ReplicationException e) {
408          throw new IOException("Cannot add log to replication queue"
409              + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
410        }
411      }
412    }
413    // update walsById map
414    synchronized (walsById) {
415      for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
416        String peerId = entry.getKey();
417        Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
418        boolean existingPrefix = false;
419        for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
420          SortedSet<String> wals = walsEntry.getValue();
421          if (this.sources.isEmpty()) {
422            // If there's no slaves, don't need to keep the old wals since
423            // we only consider the last one when a new slave comes in
424            wals.clear();
425          }
426          if (logPrefix.equals(walsEntry.getKey())) {
427            wals.add(logName);
428            existingPrefix = true;
429          }
430        }
431        if (!existingPrefix) {
432          // The new log belongs to a new group, add it into this peer
433          LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
434          SortedSet<String> wals = new TreeSet<>();
435          wals.add(logName);
436          walsByPrefix.put(logPrefix, wals);
437        }
438      }
439    }
440  }
441
442  // public because of we call it in TestReplicationEmptyWALRecovery
443  @VisibleForTesting
444  public void postLogRoll(Path newLog) throws IOException {
445    // This only updates the sources we own, not the recovered ones
446    for (ReplicationSourceInterface source : this.sources) {
447      source.enqueueLog(newLog);
448    }
449  }
450
451  @VisibleForTesting
452  public AtomicLong getTotalBufferUsed() {
453    return totalBufferUsed;
454  }
455
456  /**
457   * Factory method to create a replication source
458   * @param conf the configuration to use
459   * @param fs the file system to use
460   * @param manager the manager to use
461   * @param server the server object for this region server
462   * @param peerId the id of the peer cluster
463   * @return the created source
464   * @throws IOException
465   */
466  private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
467      ReplicationSourceManager manager, ReplicationQueues replicationQueues,
468      ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
469      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
470      WALFileLengthProvider walFileLengthProvider) throws IOException {
471    RegionServerCoprocessorHost rsServerHost = null;
472    TableDescriptors tableDescriptors = null;
473    if (server instanceof HRegionServer) {
474      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
475      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
476    }
477
478    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
479
480    ReplicationEndpoint replicationEndpoint = null;
481    try {
482      String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
483      if (replicationEndpointImpl == null) {
484        // Default to HBase inter-cluster replication endpoint
485        replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
486      }
487      Class<?> c = Class.forName(replicationEndpointImpl);
488      replicationEndpoint = (ReplicationEndpoint) c.getDeclaredConstructor().newInstance();
489      if(rsServerHost != null) {
490        ReplicationEndpoint newReplicationEndPoint = rsServerHost
491            .postCreateReplicationEndPoint(replicationEndpoint);
492        if(newReplicationEndPoint != null) {
493          // Override the newly created endpoint from the hook with configured end point
494          replicationEndpoint = newReplicationEndPoint;
495        }
496      }
497    } catch (Exception e) {
498      LOG.warn("Passed replication endpoint implementation throws errors"
499          + " while initializing ReplicationSource for peer: " + peerId, e);
500      throw new IOException(e);
501    }
502
503    MetricsSource metrics = new MetricsSource(peerId);
504    // init replication source
505    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
506      replicationEndpoint, walFileLengthProvider, metrics);
507
508    // init replication endpoint
509    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
510      fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
511
512    return src;
513  }
514
515  /**
516   * Transfer all the queues of the specified to this region server.
517   * First it tries to grab a lock and if it works it will move the
518   * znodes and finally will delete the old znodes.
519   *
520   * It creates one old source for any type of source of the old rs.
521   * @param rsZnode
522   */
523  private void transferQueues(String rsZnode) {
524    NodeFailoverWorker transfer =
525        new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
526            this.clusterId);
527    try {
528      this.executor.execute(transfer);
529    } catch (RejectedExecutionException ex) {
530      LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
531    }
532  }
533
534  /**
535   * Clear the references to the specified old source
536   * @param src source to clear
537   */
538  public void closeRecoveredQueue(ReplicationSourceInterface src) {
539    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
540    if (src instanceof ReplicationSource) {
541      ((ReplicationSource) src).getSourceMetrics().clear();
542    }
543    this.oldsources.remove(src);
544    deleteSource(src.getPeerClusterZnode(), false);
545    this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
546  }
547
548  /**
549   * Clear the references to the specified old source
550   * @param src source to clear
551   */
552  public void closeQueue(ReplicationSourceInterface src) {
553    LOG.info("Done with the queue " + src.getPeerClusterZnode());
554    src.getSourceMetrics().clear();
555    this.sources.remove(src);
556    deleteSource(src.getPeerClusterZnode(), true);
557    this.walsById.remove(src.getPeerClusterZnode());
558  }
559
560  /**
561   * Thie method first deletes all the recovered sources for the specified
562   * id, then deletes the normal source (deleting all related data in ZK).
563   * @param id The id of the peer cluster
564   */
565  public void removePeer(String id) {
566    LOG.info("Closing the following queue " + id + ", currently have "
567        + sources.size() + " and another "
568        + oldsources.size() + " that were recovered");
569    String terminateMessage = "Replication stream was removed by a user";
570    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
571    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
572    // see NodeFailoverWorker.run
573    synchronized (oldsources) {
574      // First close all the recovered sources for this peer
575      for (ReplicationSourceInterface src : oldsources) {
576        if (id.equals(src.getPeerId())) {
577          oldSourcesToDelete.add(src);
578        }
579      }
580      for (ReplicationSourceInterface src : oldSourcesToDelete) {
581        src.terminate(terminateMessage);
582        closeRecoveredQueue(src);
583      }
584    }
585    LOG.info("Number of deleted recovered sources for " + id + ": "
586        + oldSourcesToDelete.size());
587    // Now look for the one on this cluster
588    List<ReplicationSourceInterface> srcToRemove = new ArrayList<>();
589    // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
590    synchronized (this.replicationPeers) {
591      for (ReplicationSourceInterface src : this.sources) {
592        if (id.equals(src.getPeerId())) {
593          srcToRemove.add(src);
594        }
595      }
596      if (srcToRemove.isEmpty()) {
597        LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " +
598            "This could mean that ReplicationSourceInterface initialization failed for this peer " +
599            "and that replication on this peer may not be caught up. peerId=" + id);
600      }
601      for (ReplicationSourceInterface toRemove : srcToRemove) {
602        toRemove.terminate(terminateMessage);
603        closeQueue(toRemove);
604      }
605      deleteSource(id, true);
606    }
607  }
608
609  @Override
610  public void regionServerRemoved(String regionserver) {
611    transferQueues(regionserver);
612  }
613
614  @Override
615  public void peerRemoved(String peerId) {
616    removePeer(peerId);
617    this.replicationQueues.removePeerFromHFileRefs(peerId);
618  }
619
620  @Override
621  public void peerListChanged(List<String> peerIds) {
622    for (String id : peerIds) {
623      try {
624        boolean added = this.replicationPeers.peerConnected(id);
625        if (added) {
626          addSource(id);
627          if (replicationForBulkLoadDataEnabled) {
628            this.replicationQueues.addPeerToHFileRefs(id);
629          }
630        }
631      } catch (Exception e) {
632        LOG.error("Error while adding a new peer", e);
633      }
634    }
635  }
636
637  /**
638   * Class responsible to setup new ReplicationSources to take care of the
639   * queues from dead region servers.
640   */
641  class NodeFailoverWorker extends Thread {
642
643    private String rsZnode;
644    private final ReplicationQueues rq;
645    private final ReplicationPeers rp;
646    private final UUID clusterId;
647
648    /**
649     * @param rsZnode
650     */
651    public NodeFailoverWorker(String rsZnode) {
652      this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
653    }
654
655    public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
656        final ReplicationPeers replicationPeers, final UUID clusterId) {
657      super("Failover-for-"+rsZnode);
658      this.rsZnode = rsZnode;
659      this.rq = replicationQueues;
660      this.rp = replicationPeers;
661      this.clusterId = clusterId;
662    }
663
664    @Override
665    public void run() {
666      if (this.rq.isThisOurRegionServer(rsZnode)) {
667        return;
668      }
669      // Wait a bit before transferring the queues, we may be shutting down.
670      // This sleep may not be enough in some cases.
671      try {
672        Thread.sleep(sleepBeforeFailover +
673            (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
674      } catch (InterruptedException e) {
675        LOG.warn("Interrupted while waiting before transferring a queue.");
676        Thread.currentThread().interrupt();
677      }
678      // We try to lock that rs' queue directory
679      if (server.isStopped()) {
680        LOG.info("Not transferring queue since we are shutting down");
681        return;
682      }
683      Map<String, Set<String>> newQueues = new HashMap<>();
684      List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
685      while (peers != null && !peers.isEmpty()) {
686        Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
687          peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
688        long sleep = sleepBeforeFailover/2;
689        if (peer != null) {
690          newQueues.put(peer.getFirst(), peer.getSecond());
691          sleep = sleepBeforeFailover;
692        }
693        try {
694          Thread.sleep(sleep);
695        } catch (InterruptedException e) {
696          LOG.warn("Interrupted while waiting before transferring a queue.");
697          Thread.currentThread().interrupt();
698        }
699        peers = rq.getUnClaimedQueueIds(rsZnode);
700      }
701      if (peers != null) {
702        rq.removeReplicatorIfQueueIsEmpty(rsZnode);
703      }
704      // Copying over the failed queue is completed.
705      if (newQueues.isEmpty()) {
706        // We either didn't get the lock or the failed region server didn't have any outstanding
707        // WALs to replicate, so we are done.
708        return;
709      }
710
711      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
712        String peerId = entry.getKey();
713        Set<String> walsSet = entry.getValue();
714        try {
715          // there is not an actual peer defined corresponding to peerId for the failover.
716          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
717          String actualPeerId = replicationQueueInfo.getPeerId();
718          ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
719          ReplicationPeerConfig peerConfig = null;
720          try {
721            peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
722          } catch (ReplicationException ex) {
723            LOG.warn("Received exception while getting replication peer config, skipping replay"
724                + ex);
725          }
726          if (peer == null || peerConfig == null) {
727            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
728            replicationQueues.removeQueue(peerId);
729            continue;
730          }
731          if (server instanceof ReplicationSyncUp.DummyServer
732              && peer.getPeerState().equals(PeerState.DISABLED)) {
733            LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip "
734                + "replicating data to this peer.",
735              actualPeerId);
736            continue;
737          }
738          // track sources in walsByIdRecoveredQueues
739          Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
740          walsByIdRecoveredQueues.put(peerId, walsByGroup);
741          for (String wal : walsSet) {
742            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
743            SortedSet<String> wals = walsByGroup.get(walPrefix);
744            if (wals == null) {
745              wals = new TreeSet<>();
746              walsByGroup.put(walPrefix, wals);
747            }
748            wals.add(wal);
749          }
750
751          // enqueue sources
752          ReplicationSourceInterface src =
753              getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
754                server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
755          // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
756          // see removePeer
757          synchronized (oldsources) {
758            if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
759              src.terminate("Recovered queue doesn't belong to any current peer");
760              closeRecoveredQueue(src);
761              continue;
762            }
763            oldsources.add(src);
764            for (String wal : walsSet) {
765              src.enqueueLog(new Path(oldLogDir, wal));
766            }
767            src.startup();
768          }
769        } catch (IOException e) {
770          // TODO manage it
771          LOG.error("Failed creating a source", e);
772        }
773      }
774    }
775  }
776
777  class AdoptAbandonedQueuesWorker extends Thread{
778
779    public AdoptAbandonedQueuesWorker() {}
780
781    @Override
782    public void run() {
783      List<String> currentReplicators = null;
784      try {
785        currentReplicators = replicationQueues.getListOfReplicators();
786      } catch (ReplicationException e) {
787        server.abort("Failed to get all replicators", e);
788        return;
789      }
790      if (currentReplicators == null || currentReplicators.isEmpty()) {
791        return;
792      }
793      List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
794      LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
795        + otherRegionServers);
796
797      // Look if there's anything to process after a restart
798      for (String rs : currentReplicators) {
799        if (!otherRegionServers.contains(rs)) {
800          transferQueues(rs);
801        }
802      }
803    }
804  }
805
806  /**
807   * Get the directory where wals are archived
808   * @return the directory where wals are archived
809   */
810  public Path getOldLogDir() {
811    return this.oldLogDir;
812  }
813
814  /**
815   * Get the directory where wals are stored by their RSs
816   * @return the directory where wals are stored by their RSs
817   */
818  public Path getLogDir() {
819    return this.logDir;
820  }
821
822  /**
823   * Get the handle on the local file system
824   * @return Handle on the local file system
825   */
826  public FileSystem getFs() {
827    return this.fs;
828  }
829
830  /**
831   * Get the ReplicationPeers used by this ReplicationSourceManager
832   * @return the ReplicationPeers used by this ReplicationSourceManager
833   */
834  public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
835
836  /**
837   * Get a string representation of all the sources' metrics
838   */
839  public String getStats() {
840    StringBuilder stats = new StringBuilder();
841    for (ReplicationSourceInterface source : sources) {
842      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
843      stats.append(source.getStats() + "\n");
844    }
845    for (ReplicationSourceInterface oldSource : oldsources) {
846      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": ");
847      stats.append(oldSource.getStats()+ "\n");
848    }
849    return stats.toString();
850  }
851
852  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
853      throws ReplicationException {
854    for (ReplicationSourceInterface source : this.sources) {
855      source.addHFileRefs(tableName, family, pairs);
856    }
857  }
858
859  public void cleanUpHFileRefs(String peerId, List<String> files) {
860    this.replicationQueues.removeHFileRefs(peerId, files);
861  }
862}