View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.util.concurrent.ThreadFactoryBuilder;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.SortedMap;
33  import java.util.SortedSet;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FileSystem;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.Server;
49  import org.apache.hadoop.hbase.TableDescriptors;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.regionserver.HRegionServer;
52  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
53  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationListener;
56  import org.apache.hadoop.hbase.replication.ReplicationPeer;
57  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
58  import org.apache.hadoop.hbase.replication.ReplicationPeers;
59  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60  import org.apache.hadoop.hbase.replication.ReplicationQueues;
61  import org.apache.hadoop.hbase.replication.ReplicationTracker;
62  
63  /**
64   * This class is responsible to manage all the replication
65   * sources. There are two classes of sources:
66   * <li> Normal sources are persistent and one per peer cluster</li>
67   * <li> Old sources are recovered from a failed region server and our
68   * only goal is to finish replicating the WAL queue it had up in ZK</li>
69   *
70   * When a region server dies, this class uses a watcher to get notified and it
71   * tries to grab a lock in order to transfer all the queues in a local
72   * old source.
73   *
74   * This class implements the ReplicationListener interface so that it can track changes in
75   * replication state.
76   */
77  @InterfaceAudience.Private
78  public class ReplicationSourceManager implements ReplicationListener {
79    private static final Log LOG =
80        LogFactory.getLog(ReplicationSourceManager.class);
81    // List of all the sources that read this RS's logs
82    private final List<ReplicationSourceInterface> sources;
83    // List of all the sources we got from died RSs
84    private final List<ReplicationSourceInterface> oldsources;
85    private final ReplicationQueues replicationQueues;
86    private final ReplicationTracker replicationTracker;
87    private final ReplicationPeers replicationPeers;
88    // UUID for this cluster
89    private final UUID clusterId;
90    // All about stopping
91    private final Server server;
92    // All logs we are currently tracking
93    private final Map<String, SortedSet<String>> walsById;
94    // Logs for recovered sources we are currently tracking
95    private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
96    private final Configuration conf;
97    private final FileSystem fs;
98    // The path to the latest log we saw, for new coming sources
99    private Path latestPath;
100   // Path to the wals directories
101   private final Path logDir;
102   // Path to the wal archive
103   private final Path oldLogDir;
104   // The number of ms that we wait before moving znodes, HBASE-3596
105   private final long sleepBeforeFailover;
106   // Homemade executer service for replication
107   private final ThreadPoolExecutor executor;
108 
109   private final Random rand;
110 
111 
112   /**
113    * Creates a replication manager and sets the watch on all the other registered region servers
114    * @param replicationQueues the interface for manipulating replication queues
115    * @param replicationPeers
116    * @param replicationTracker
117    * @param conf the configuration to use
118    * @param server the server for this region server
119    * @param fs the file system to use
120    * @param logDir the directory that contains all wal directories of live RSs
121    * @param oldLogDir the directory where old logs are archived
122    * @param clusterId
123    */
124   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
125       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
126       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
127       final Path oldLogDir, final UUID clusterId) {
128     //CopyOnWriteArrayList is thread-safe.
129     //Generally, reading is more than modifying.
130     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
131     this.replicationQueues = replicationQueues;
132     this.replicationPeers = replicationPeers;
133     this.replicationTracker = replicationTracker;
134     this.server = server;
135     this.walsById = new HashMap<String, SortedSet<String>>();
136     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
137     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
138     this.conf = conf;
139     this.fs = fs;
140     this.logDir = logDir;
141     this.oldLogDir = oldLogDir;
142     this.sleepBeforeFailover =
143         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
144     this.clusterId = clusterId;
145     this.replicationTracker.registerListener(this);
146     this.replicationPeers.getAllPeerIds();
147     // It's preferable to failover 1 RS at a time, but with good zk servers
148     // more could be processed at the same time.
149     int nbWorkers = conf.getInt("replication.executor.workers", 1);
150     // use a short 100ms sleep since this could be done inline with a RS startup
151     // even if we fail, other region servers can take care of it
152     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
153         100, TimeUnit.MILLISECONDS,
154         new LinkedBlockingQueue<Runnable>());
155     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
156     tfb.setNameFormat("ReplicationExecutor-%d");
157     tfb.setDaemon(true);
158     this.executor.setThreadFactory(tfb.build());
159     this.rand = new Random();
160   }
161 
162   /**
163    * Provide the id of the peer and a log key and this method will figure which
164    * wal it belongs to and will log, for this region server, the current
165    * position. It will also clean old logs from the queue.
166    * @param log Path to the log currently being replicated from
167    * replication status in zookeeper. It will also delete older entries.
168    * @param id id of the peer cluster
169    * @param position current location in the log
170    * @param queueRecovered indicates if this queue comes from another region server
171    * @param holdLogInZK if true then the log is retained in ZK
172    */
173   public void logPositionAndCleanOldLogs(Path log, String id, long position,
174       boolean queueRecovered, boolean holdLogInZK) {
175     String fileName = log.getName();
176     this.replicationQueues.setLogPosition(id, fileName, position);
177     if (holdLogInZK) {
178      return;
179     }
180     cleanOldLogs(fileName, id, queueRecovered);
181   }
182 
183   /**
184    * Cleans a log file and all older files from ZK. Called when we are sure that a
185    * log file is closed and has no more entries.
186    * @param key Path to the log
187    * @param id id of the peer cluster
188    * @param queueRecovered Whether this is a recovered queue
189    */
190   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
191     if (queueRecovered) {
192       SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
193       if (wals != null && !wals.first().equals(key)) {
194         cleanOldLogs(wals, key, id);
195       }
196     } else {
197       synchronized (this.walsById) {
198         SortedSet<String> wals = walsById.get(id);
199         if (!wals.first().equals(key)) {
200           cleanOldLogs(wals, key, id);
201         }
202       }
203     }
204  }
205 
206   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
207     SortedSet<String> walSet = wals.headSet(key);
208     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
209     for (String wal : walSet) {
210       this.replicationQueues.removeLog(id, wal);
211     }
212     walSet.clear();
213   }
214 
215   /**
216    * Adds a normal source per registered peer cluster and tries to process all
217    * old region server wal queues
218    */
219   protected void init() throws IOException, ReplicationException {
220     for (String id : this.replicationPeers.getPeerIds()) {
221       addSource(id);
222     }
223     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
224     if (currentReplicators == null || currentReplicators.size() == 0) {
225       return;
226     }
227     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
228     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
229         + otherRegionServers);
230 
231     // Look if there's anything to process after a restart
232     for (String rs : currentReplicators) {
233       if (!otherRegionServers.contains(rs)) {
234         transferQueues(rs);
235       }
236     }
237   }
238 
239   /**
240    * Add a new normal source to this region server
241    * @param id the id of the peer cluster
242    * @return the source that was created
243    * @throws IOException
244    */
245   protected ReplicationSourceInterface addSource(String id) throws IOException,
246       ReplicationException {
247     ReplicationPeerConfig peerConfig
248       = replicationPeers.getReplicationPeerConfig(id);
249     ReplicationPeer peer = replicationPeers.getPeer(id);
250     ReplicationSourceInterface src =
251         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
252           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
253     synchronized (this.walsById) {
254       this.sources.add(src);
255       this.walsById.put(id, new TreeSet<String>());
256       // Add the latest wal to that source's queue
257       if (this.latestPath != null) {
258         String name = this.latestPath.getName();
259         this.walsById.get(id).add(name);
260         try {
261           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
262         } catch (ReplicationException e) {
263           String message =
264               "Cannot add log to queue when creating a new source, queueId="
265                   + src.getPeerClusterZnode() + ", filename=" + name;
266           server.stop(message);
267           throw e;
268         }
269         src.enqueueLog(this.latestPath);
270       }
271     }
272     src.startup();
273     return src;
274   }
275 
276   /**
277    * Delete a complete queue of wals associated with a peer cluster
278    * @param peerId Id of the peer cluster queue of wals to delete
279    */
280   public void deleteSource(String peerId, boolean closeConnection) {
281     this.replicationQueues.removeQueue(peerId);
282     if (closeConnection) {
283       this.replicationPeers.peerRemoved(peerId);
284     }
285   }
286 
287   /**
288    * Terminate the replication on this region server
289    */
290   public void join() {
291     this.executor.shutdown();
292     if (this.sources.size() == 0) {
293       this.replicationQueues.removeAllQueues();
294     }
295     for (ReplicationSourceInterface source : this.sources) {
296       source.terminate("Region server is closing");
297     }
298   }
299 
300   /**
301    * Get a copy of the wals of the first source on this rs
302    * @return a sorted set of wal names
303    */
304   protected Map<String, SortedSet<String>> getWALs() {
305     return Collections.unmodifiableMap(walsById);
306   }
307 
308   /**
309    * Get a copy of the wals of the recovered sources on this rs
310    * @return a sorted set of wal names
311    */
312   protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
313     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
314   }
315 
316   /**
317    * Get a list of all the normal sources of this rs
318    * @return lis of all sources
319    */
320   public List<ReplicationSourceInterface> getSources() {
321     return this.sources;
322   }
323 
324   /**
325    * Get a list of all the old sources of this rs
326    * @return list of all old sources
327    */
328   public List<ReplicationSourceInterface> getOldSources() {
329     return this.oldsources;
330   }
331 
332   @VisibleForTesting
333   List<String> getAllQueues() {
334     return replicationQueues.getAllQueues();
335   }
336 
337   void preLogRoll(Path newLog) throws IOException {
338     synchronized (this.walsById) {
339       String name = newLog.getName();
340       for (ReplicationSourceInterface source : this.sources) {
341         try {
342           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
343         } catch (ReplicationException e) {
344           throw new IOException("Cannot add log to replication queue with id="
345               + source.getPeerClusterZnode() + ", filename=" + name, e);
346         }
347       }
348       for (SortedSet<String> wals : this.walsById.values()) {
349         if (this.sources.isEmpty()) {
350           // If there's no slaves, don't need to keep the old wals since
351           // we only consider the last one when a new slave comes in
352           wals.clear();
353         }
354         wals.add(name);
355       }
356     }
357 
358     this.latestPath = newLog;
359   }
360 
361   void postLogRoll(Path newLog) throws IOException {
362     // This only updates the sources we own, not the recovered ones
363     for (ReplicationSourceInterface source : this.sources) {
364       source.enqueueLog(newLog);
365     }
366   }
367 
368   /**
369    * Factory method to create a replication source
370    * @param conf the configuration to use
371    * @param fs the file system to use
372    * @param manager the manager to use
373    * @param server the server object for this region server
374    * @param peerId the id of the peer cluster
375    * @return the created source
376    * @throws IOException
377    */
378   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
379       final FileSystem fs, final ReplicationSourceManager manager,
380       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
381       final Server server, final String peerId, final UUID clusterId,
382       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
383           throws IOException {
384     RegionServerCoprocessorHost rsServerHost = null;
385     TableDescriptors tableDescriptors = null;
386     if (server instanceof HRegionServer) {
387       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
388       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
389     }
390     ReplicationSourceInterface src;
391     try {
392       @SuppressWarnings("rawtypes")
393       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
394           ReplicationSource.class.getCanonicalName()));
395       src = (ReplicationSourceInterface) c.newInstance();
396     } catch (Exception e) {
397       LOG.warn("Passed replication source implementation throws errors, " +
398           "defaulting to ReplicationSource", e);
399       src = new ReplicationSource();
400     }
401 
402     ReplicationEndpoint replicationEndpoint = null;
403     try {
404       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
405       if (replicationEndpointImpl == null) {
406         // Default to HBase inter-cluster replication endpoint
407         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
408       }
409       @SuppressWarnings("rawtypes")
410       Class c = Class.forName(replicationEndpointImpl);
411       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
412       if(rsServerHost != null) {
413         ReplicationEndpoint newReplicationEndPoint = rsServerHost
414             .postCreateReplicationEndPoint(replicationEndpoint);
415         if(newReplicationEndPoint != null) {
416           // Override the newly created endpoint from the hook with configured end point
417           replicationEndpoint = newReplicationEndPoint;
418         }
419       }
420     } catch (Exception e) {
421       LOG.warn("Passed replication endpoint implementation throws errors", e);
422       throw new IOException(e);
423     }
424 
425     MetricsSource metrics = new MetricsSource(peerId);
426     // init replication source
427     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
428       clusterId, replicationEndpoint, metrics);
429 
430     // init replication endpoint
431     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
432       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
433 
434     return src;
435   }
436 
437   /**
438    * Transfer all the queues of the specified to this region server.
439    * First it tries to grab a lock and if it works it will move the
440    * znodes and finally will delete the old znodes.
441    *
442    * It creates one old source for any type of source of the old rs.
443    * @param rsZnode
444    */
445   private void transferQueues(String rsZnode) {
446     NodeFailoverWorker transfer =
447         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
448             this.clusterId);
449     try {
450       this.executor.execute(transfer);
451     } catch (RejectedExecutionException ex) {
452       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
453     }
454   }
455 
456   /**
457    * Clear the references to the specified old source
458    * @param src source to clear
459    */
460   public void closeRecoveredQueue(ReplicationSourceInterface src) {
461     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
462     this.oldsources.remove(src);
463     deleteSource(src.getPeerClusterZnode(), false);
464     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
465   }
466 
467   /**
468    * Thie method first deletes all the recovered sources for the specified
469    * id, then deletes the normal source (deleting all related data in ZK).
470    * @param id The id of the peer cluster
471    */
472   public void removePeer(String id) {
473     LOG.info("Closing the following queue " + id + ", currently have "
474         + sources.size() + " and another "
475         + oldsources.size() + " that were recovered");
476     String terminateMessage = "Replication stream was removed by a user";
477     List<ReplicationSourceInterface> oldSourcesToDelete =
478         new ArrayList<ReplicationSourceInterface>();
479     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
480     // see NodeFailoverWorker.run
481     synchronized (oldsources) {
482       // First close all the recovered sources for this peer
483       for (ReplicationSourceInterface src : oldsources) {
484         if (id.equals(src.getPeerClusterId())) {
485           oldSourcesToDelete.add(src);
486         }
487       }
488       for (ReplicationSourceInterface src : oldSourcesToDelete) {
489         src.terminate(terminateMessage);
490         closeRecoveredQueue(src);
491       }
492     }
493     LOG.info("Number of deleted recovered sources for " + id + ": "
494         + oldSourcesToDelete.size());
495     // Now look for the one on this cluster
496     List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
497     for (ReplicationSourceInterface src : this.sources) {
498       if (id.equals(src.getPeerClusterId())) {
499         srcToRemove.add(src);
500       }
501     }
502     if (srcToRemove.size() == 0) {
503       LOG.error("The queue we wanted to close is missing " + id);
504       return;
505     }
506     for (ReplicationSourceInterface toRemove : srcToRemove) {
507       toRemove.terminate(terminateMessage);
508       this.sources.remove(toRemove);
509     }
510     deleteSource(id, true);
511   }
512 
513   @Override
514   public void regionServerRemoved(String regionserver) {
515     transferQueues(regionserver);
516   }
517 
518   @Override
519   public void peerRemoved(String peerId) {
520     removePeer(peerId);
521   }
522 
523   @Override
524   public void peerListChanged(List<String> peerIds) {
525     for (String id : peerIds) {
526       try {
527         boolean added = this.replicationPeers.peerAdded(id);
528         if (added) {
529           addSource(id);
530         }
531       } catch (Exception e) {
532         LOG.error("Error while adding a new peer", e);
533       }
534     }
535   }
536 
537   /**
538    * Class responsible to setup new ReplicationSources to take care of the
539    * queues from dead region servers.
540    */
541   class NodeFailoverWorker extends Thread {
542 
543     private String rsZnode;
544     private final ReplicationQueues rq;
545     private final ReplicationPeers rp;
546     private final UUID clusterId;
547 
548     /**
549      * @param rsZnode
550      */
551     public NodeFailoverWorker(String rsZnode) {
552       this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
553     }
554 
555     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
556         final ReplicationPeers replicationPeers, final UUID clusterId) {
557       super("Failover-for-"+rsZnode);
558       this.rsZnode = rsZnode;
559       this.rq = replicationQueues;
560       this.rp = replicationPeers;
561       this.clusterId = clusterId;
562     }
563 
564     @Override
565     public void run() {
566       if (this.rq.isThisOurZnode(rsZnode)) {
567         return;
568       }
569       // Wait a bit before transferring the queues, we may be shutting down.
570       // This sleep may not be enough in some cases.
571       try {
572         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
573       } catch (InterruptedException e) {
574         LOG.warn("Interrupted while waiting before transferring a queue.");
575         Thread.currentThread().interrupt();
576       }
577       // We try to lock that rs' queue directory
578       if (server.isStopped()) {
579         LOG.info("Not transferring queue since we are shutting down");
580         return;
581       }
582       SortedMap<String, SortedSet<String>> newQueues = null;
583 
584       newQueues = this.rq.claimQueues(rsZnode);
585 
586       // Copying over the failed queue is completed.
587       if (newQueues.isEmpty()) {
588         // We either didn't get the lock or the failed region server didn't have any outstanding
589         // WALs to replicate, so we are done.
590         return;
591       }
592 
593       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
594         String peerId = entry.getKey();
595         SortedSet<String> walsSet = entry.getValue();
596         try {
597           // there is not an actual peer defined corresponding to peerId for the failover.
598           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
599           String actualPeerId = replicationQueueInfo.getPeerId();
600           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
601           ReplicationPeerConfig peerConfig = null;
602           try {
603             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
604           } catch (ReplicationException ex) {
605             LOG.warn("Received exception while getting replication peer config, skipping replay"
606                 + ex);
607           }
608           if (peer == null || peerConfig == null) {
609             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
610             replicationQueues.removeQueue(peerId);
611             continue;
612           }
613 
614           ReplicationSourceInterface src =
615               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
616                 server, peerId, this.clusterId, peerConfig, peer);
617           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
618           // see removePeer
619           synchronized (oldsources) {
620             if (!this.rp.getPeerIds().contains(src.getPeerClusterId())) {
621               src.terminate("Recovered queue doesn't belong to any current peer");
622               closeRecoveredQueue(src);
623               continue;
624             }
625             oldsources.add(src);
626             for (String wal : walsSet) {
627               src.enqueueLog(new Path(oldLogDir, wal));
628             }
629             src.startup();
630           }
631           walsByIdRecoveredQueues.put(peerId, walsSet);
632         } catch (IOException e) {
633           // TODO manage it
634           LOG.error("Failed creating a source", e);
635         }
636       }
637     }
638   }
639 
640   /**
641    * Get the directory where wals are archived
642    * @return the directory where wals are archived
643    */
644   public Path getOldLogDir() {
645     return this.oldLogDir;
646   }
647 
648   /**
649    * Get the directory where wals are stored by their RSs
650    * @return the directory where wals are stored by their RSs
651    */
652   public Path getLogDir() {
653     return this.logDir;
654   }
655 
656   /**
657    * Get the handle on the local file system
658    * @return Handle on the local file system
659    */
660   public FileSystem getFs() {
661     return this.fs;
662   }
663 
664   /**
665    * Get a string representation of all the sources' metrics
666    */
667   public String getStats() {
668     StringBuffer stats = new StringBuffer();
669     for (ReplicationSourceInterface source : sources) {
670       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
671       stats.append(source.getStats() + "\n");
672     }
673     for (ReplicationSourceInterface oldSource : oldsources) {
674       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
675       stats.append(oldSource.getStats()+ "\n");
676     }
677     return stats.toString();
678   }
679 }