View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.TableDescriptors;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.regionserver.HRegionServer;
49  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
50  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationListener;
53  import org.apache.hadoop.hbase.replication.ReplicationPeer;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.ReplicationTracker;
59  
60  import com.google.common.util.concurrent.ThreadFactoryBuilder;
61  
62  /**
63   * This class is responsible to manage all the replication
64   * sources. There are two classes of sources:
65   * <li> Normal sources are persistent and one per peer cluster</li>
66   * <li> Old sources are recovered from a failed region server and our
67   * only goal is to finish replicating the WAL queue it had up in ZK</li>
68   *
69   * When a region server dies, this class uses a watcher to get notified and it
70   * tries to grab a lock in order to transfer all the queues in a local
71   * old source.
72   *
73   * This class implements the ReplicationListener interface so that it can track changes in
74   * replication state.
75   */
76  @InterfaceAudience.Private
77  public class ReplicationSourceManager implements ReplicationListener {
78    private static final Log LOG =
79        LogFactory.getLog(ReplicationSourceManager.class);
80    // List of all the sources that read this RS's logs
81    private final List<ReplicationSourceInterface> sources;
82    // List of all the sources we got from died RSs
83    private final List<ReplicationSourceInterface> oldsources;
84    private final ReplicationQueues replicationQueues;
85    private final ReplicationTracker replicationTracker;
86    private final ReplicationPeers replicationPeers;
87    // UUID for this cluster
88    private final UUID clusterId;
89    // All about stopping
90    private final Server server;
91    // All logs we are currently tracking
92    private final Map<String, SortedSet<String>> walsById;
93    // Logs for recovered sources we are currently tracking
94    private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
95    private final Configuration conf;
96    private final FileSystem fs;
97    // The path to the latest log we saw, for new coming sources
98    private Path latestPath;
99    // Path to the wals directories
100   private final Path logDir;
101   // Path to the wal archive
102   private final Path oldLogDir;
103   // The number of ms that we wait before moving znodes, HBASE-3596
104   private final long sleepBeforeFailover;
105   // Homemade executer service for replication
106   private final ThreadPoolExecutor executor;
107 
108   private final Random rand;
109 
110 
111   /**
112    * Creates a replication manager and sets the watch on all the other registered region servers
113    * @param replicationQueues the interface for manipulating replication queues
114    * @param replicationPeers
115    * @param replicationTracker
116    * @param conf the configuration to use
117    * @param server the server for this region server
118    * @param fs the file system to use
119    * @param logDir the directory that contains all wal directories of live RSs
120    * @param oldLogDir the directory where old logs are archived
121    * @param clusterId
122    */
123   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
124       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
125       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
126       final Path oldLogDir, final UUID clusterId) {
127     //CopyOnWriteArrayList is thread-safe.
128     //Generally, reading is more than modifying.
129     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
130     this.replicationQueues = replicationQueues;
131     this.replicationPeers = replicationPeers;
132     this.replicationTracker = replicationTracker;
133     this.server = server;
134     this.walsById = new HashMap<String, SortedSet<String>>();
135     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
136     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
137     this.conf = conf;
138     this.fs = fs;
139     this.logDir = logDir;
140     this.oldLogDir = oldLogDir;
141     this.sleepBeforeFailover =
142         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
143     this.clusterId = clusterId;
144     this.replicationTracker.registerListener(this);
145     this.replicationPeers.getAllPeerIds();
146     // It's preferable to failover 1 RS at a time, but with good zk servers
147     // more could be processed at the same time.
148     int nbWorkers = conf.getInt("replication.executor.workers", 1);
149     // use a short 100ms sleep since this could be done inline with a RS startup
150     // even if we fail, other region servers can take care of it
151     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
152         100, TimeUnit.MILLISECONDS,
153         new LinkedBlockingQueue<Runnable>());
154     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
155     tfb.setNameFormat("ReplicationExecutor-%d");
156     tfb.setDaemon(true);
157     this.executor.setThreadFactory(tfb.build());
158     this.rand = new Random();
159   }
160 
161   /**
162    * Provide the id of the peer and a log key and this method will figure which
163    * wal it belongs to and will log, for this region server, the current
164    * position. It will also clean old logs from the queue.
165    * @param log Path to the log currently being replicated from
166    * replication status in zookeeper. It will also delete older entries.
167    * @param id id of the peer cluster
168    * @param position current location in the log
169    * @param queueRecovered indicates if this queue comes from another region server
170    * @param holdLogInZK if true then the log is retained in ZK
171    */
172   public void logPositionAndCleanOldLogs(Path log, String id, long position,
173       boolean queueRecovered, boolean holdLogInZK) {
174     String fileName = log.getName();
175     this.replicationQueues.setLogPosition(id, fileName, position);
176     if (holdLogInZK) {
177      return;
178     }
179     cleanOldLogs(fileName, id, queueRecovered);
180   }
181 
182   /**
183    * Cleans a log file and all older files from ZK. Called when we are sure that a
184    * log file is closed and has no more entries.
185    * @param key Path to the log
186    * @param id id of the peer cluster
187    * @param queueRecovered Whether this is a recovered queue
188    */
189   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
190     if (queueRecovered) {
191       SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
192       if (wals != null && !wals.first().equals(key)) {
193         cleanOldLogs(wals, key, id);
194       }
195     } else {
196       synchronized (this.walsById) {
197         SortedSet<String> wals = walsById.get(id);
198         if (!wals.first().equals(key)) {
199           cleanOldLogs(wals, key, id);
200         }
201       }
202     }
203  }
204 
205   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
206     SortedSet<String> walSet = wals.headSet(key);
207     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
208     for (String wal : walSet) {
209       this.replicationQueues.removeLog(id, wal);
210     }
211     walSet.clear();
212   }
213 
214   /**
215    * Adds a normal source per registered peer cluster and tries to process all
216    * old region server wal queues
217    */
218   protected void init() throws IOException, ReplicationException {
219     for (String id : this.replicationPeers.getPeerIds()) {
220       addSource(id);
221     }
222     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
223     if (currentReplicators == null || currentReplicators.size() == 0) {
224       return;
225     }
226     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
227     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
228         + otherRegionServers);
229 
230     // Look if there's anything to process after a restart
231     for (String rs : currentReplicators) {
232       if (!otherRegionServers.contains(rs)) {
233         transferQueues(rs);
234       }
235     }
236   }
237 
238   /**
239    * Add a new normal source to this region server
240    * @param id the id of the peer cluster
241    * @return the source that was created
242    * @throws IOException
243    */
244   protected ReplicationSourceInterface addSource(String id) throws IOException,
245       ReplicationException {
246     ReplicationPeerConfig peerConfig
247       = replicationPeers.getReplicationPeerConfig(id);
248     ReplicationPeer peer = replicationPeers.getPeer(id);
249     ReplicationSourceInterface src =
250         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
251           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
252     synchronized (this.walsById) {
253       this.sources.add(src);
254       this.walsById.put(id, new TreeSet<String>());
255       // Add the latest wal to that source's queue
256       if (this.latestPath != null) {
257         String name = this.latestPath.getName();
258         this.walsById.get(id).add(name);
259         try {
260           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
261         } catch (ReplicationException e) {
262           String message =
263               "Cannot add log to queue when creating a new source, queueId="
264                   + src.getPeerClusterZnode() + ", filename=" + name;
265           server.stop(message);
266           throw e;
267         }
268         src.enqueueLog(this.latestPath);
269       }
270     }
271     src.startup();
272     return src;
273   }
274 
275   /**
276    * Delete a complete queue of wals associated with a peer cluster
277    * @param peerId Id of the peer cluster queue of wals to delete
278    */
279   public void deleteSource(String peerId, boolean closeConnection) {
280     this.replicationQueues.removeQueue(peerId);
281     if (closeConnection) {
282       this.replicationPeers.peerRemoved(peerId);
283     }
284   }
285 
286   /**
287    * Terminate the replication on this region server
288    */
289   public void join() {
290     this.executor.shutdown();
291     if (this.sources.size() == 0) {
292       this.replicationQueues.removeAllQueues();
293     }
294     for (ReplicationSourceInterface source : this.sources) {
295       source.terminate("Region server is closing");
296     }
297   }
298 
299   /**
300    * Get a copy of the wals of the first source on this rs
301    * @return a sorted set of wal names
302    */
303   protected Map<String, SortedSet<String>> getWALs() {
304     return Collections.unmodifiableMap(walsById);
305   }
306 
307   /**
308    * Get a copy of the wals of the recovered sources on this rs
309    * @return a sorted set of wal names
310    */
311   protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
312     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
313   }
314 
315   /**
316    * Get a list of all the normal sources of this rs
317    * @return lis of all sources
318    */
319   public List<ReplicationSourceInterface> getSources() {
320     return this.sources;
321   }
322 
323   /**
324    * Get a list of all the old sources of this rs
325    * @return list of all old sources
326    */
327   public List<ReplicationSourceInterface> getOldSources() {
328     return this.oldsources;
329   }
330 
331   void preLogRoll(Path newLog) throws IOException {
332     synchronized (this.walsById) {
333       String name = newLog.getName();
334       for (ReplicationSourceInterface source : this.sources) {
335         try {
336           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
337         } catch (ReplicationException e) {
338           throw new IOException("Cannot add log to replication queue with id="
339               + source.getPeerClusterZnode() + ", filename=" + name, e);
340         }
341       }
342       for (SortedSet<String> wals : this.walsById.values()) {
343         if (this.sources.isEmpty()) {
344           // If there's no slaves, don't need to keep the old wals since
345           // we only consider the last one when a new slave comes in
346           wals.clear();
347         }
348         wals.add(name);
349       }
350     }
351 
352     this.latestPath = newLog;
353   }
354 
355   void postLogRoll(Path newLog) throws IOException {
356     // This only updates the sources we own, not the recovered ones
357     for (ReplicationSourceInterface source : this.sources) {
358       source.enqueueLog(newLog);
359     }
360   }
361 
362   /**
363    * Factory method to create a replication source
364    * @param conf the configuration to use
365    * @param fs the file system to use
366    * @param manager the manager to use
367    * @param server the server object for this region server
368    * @param peerId the id of the peer cluster
369    * @return the created source
370    * @throws IOException
371    */
372   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
373       final FileSystem fs, final ReplicationSourceManager manager,
374       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
375       final Server server, final String peerId, final UUID clusterId,
376       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
377           throws IOException {
378     RegionServerCoprocessorHost rsServerHost = null;
379     TableDescriptors tableDescriptors = null;
380     if (server instanceof HRegionServer) {
381       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
382       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
383     }
384     ReplicationSourceInterface src;
385     try {
386       @SuppressWarnings("rawtypes")
387       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
388           ReplicationSource.class.getCanonicalName()));
389       src = (ReplicationSourceInterface) c.newInstance();
390     } catch (Exception e) {
391       LOG.warn("Passed replication source implementation throws errors, " +
392           "defaulting to ReplicationSource", e);
393       src = new ReplicationSource();
394     }
395 
396     ReplicationEndpoint replicationEndpoint = null;
397     try {
398       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
399       if (replicationEndpointImpl == null) {
400         // Default to HBase inter-cluster replication endpoint
401         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
402       }
403       @SuppressWarnings("rawtypes")
404       Class c = Class.forName(replicationEndpointImpl);
405       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
406       if(rsServerHost != null) {
407         ReplicationEndpoint newReplicationEndPoint = rsServerHost
408             .postCreateReplicationEndPoint(replicationEndpoint);
409         if(newReplicationEndPoint != null) {
410           // Override the newly created endpoint from the hook with configured end point
411           replicationEndpoint = newReplicationEndPoint;
412         }
413       }
414     } catch (Exception e) {
415       LOG.warn("Passed replication endpoint implementation throws errors", e);
416       throw new IOException(e);
417     }
418 
419     MetricsSource metrics = new MetricsSource(peerId);
420     // init replication source
421     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
422       clusterId, replicationEndpoint, metrics);
423 
424     // init replication endpoint
425     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
426       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
427 
428     return src;
429   }
430 
431   /**
432    * Transfer all the queues of the specified to this region server.
433    * First it tries to grab a lock and if it works it will move the
434    * znodes and finally will delete the old znodes.
435    *
436    * It creates one old source for any type of source of the old rs.
437    * @param rsZnode
438    */
439   private void transferQueues(String rsZnode) {
440     NodeFailoverWorker transfer =
441         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
442             this.clusterId);
443     try {
444       this.executor.execute(transfer);
445     } catch (RejectedExecutionException ex) {
446       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
447     }
448   }
449 
450   /**
451    * Clear the references to the specified old source
452    * @param src source to clear
453    */
454   public void closeRecoveredQueue(ReplicationSourceInterface src) {
455     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
456     this.oldsources.remove(src);
457     deleteSource(src.getPeerClusterZnode(), false);
458     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
459   }
460 
461   /**
462    * Thie method first deletes all the recovered sources for the specified
463    * id, then deletes the normal source (deleting all related data in ZK).
464    * @param id The id of the peer cluster
465    */
466   public void removePeer(String id) {
467     LOG.info("Closing the following queue " + id + ", currently have "
468         + sources.size() + " and another "
469         + oldsources.size() + " that were recovered");
470     String terminateMessage = "Replication stream was removed by a user";
471     ReplicationSourceInterface srcToRemove = null;
472     List<ReplicationSourceInterface> oldSourcesToDelete =
473         new ArrayList<ReplicationSourceInterface>();
474     // First close all the recovered sources for this peer
475     for (ReplicationSourceInterface src : oldsources) {
476       if (id.equals(src.getPeerClusterId())) {
477         oldSourcesToDelete.add(src);
478       }
479     }
480     for (ReplicationSourceInterface src : oldSourcesToDelete) {
481       src.terminate(terminateMessage);
482       closeRecoveredQueue((src));
483     }
484     LOG.info("Number of deleted recovered sources for " + id + ": "
485         + oldSourcesToDelete.size());
486     // Now look for the one on this cluster
487     for (ReplicationSourceInterface src : this.sources) {
488       if (id.equals(src.getPeerClusterId())) {
489         srcToRemove = src;
490         break;
491       }
492     }
493     if (srcToRemove == null) {
494       LOG.error("The queue we wanted to close is missing " + id);
495       return;
496     }
497     srcToRemove.terminate(terminateMessage);
498     this.sources.remove(srcToRemove);
499     deleteSource(id, true);
500   }
501 
502   @Override
503   public void regionServerRemoved(String regionserver) {
504     transferQueues(regionserver);
505   }
506 
507   @Override
508   public void peerRemoved(String peerId) {
509     removePeer(peerId);
510   }
511 
512   @Override
513   public void peerListChanged(List<String> peerIds) {
514     for (String id : peerIds) {
515       try {
516         boolean added = this.replicationPeers.peerAdded(id);
517         if (added) {
518           addSource(id);
519         }
520       } catch (Exception e) {
521         LOG.error("Error while adding a new peer", e);
522       }
523     }
524   }
525 
526   /**
527    * Class responsible to setup new ReplicationSources to take care of the
528    * queues from dead region servers.
529    */
530   class NodeFailoverWorker extends Thread {
531 
532     private String rsZnode;
533     private final ReplicationQueues rq;
534     private final ReplicationPeers rp;
535     private final UUID clusterId;
536 
537     /**
538      *
539      * @param rsZnode
540      */
541     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
542         final ReplicationPeers replicationPeers, final UUID clusterId) {
543       super("Failover-for-"+rsZnode);
544       this.rsZnode = rsZnode;
545       this.rq = replicationQueues;
546       this.rp = replicationPeers;
547       this.clusterId = clusterId;
548     }
549 
550     @Override
551     public void run() {
552       if (this.rq.isThisOurZnode(rsZnode)) {
553         return;
554       }
555       // Wait a bit before transferring the queues, we may be shutting down.
556       // This sleep may not be enough in some cases.
557       try {
558         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
559       } catch (InterruptedException e) {
560         LOG.warn("Interrupted while waiting before transferring a queue.");
561         Thread.currentThread().interrupt();
562       }
563       // We try to lock that rs' queue directory
564       if (server.isStopped()) {
565         LOG.info("Not transferring queue since we are shutting down");
566         return;
567       }
568       SortedMap<String, SortedSet<String>> newQueues = null;
569 
570       newQueues = this.rq.claimQueues(rsZnode);
571 
572       // Copying over the failed queue is completed.
573       if (newQueues.isEmpty()) {
574         // We either didn't get the lock or the failed region server didn't have any outstanding
575         // WALs to replicate, so we are done.
576         return;
577       }
578 
579       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
580         String peerId = entry.getKey();
581         try {
582           // there is not an actual peer defined corresponding to peerId for the failover.
583           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
584           String actualPeerId = replicationQueueInfo.getPeerId();
585           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
586           ReplicationPeerConfig peerConfig = null;
587           try {
588             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
589           } catch (ReplicationException ex) {
590             LOG.warn("Received exception while getting replication peer config, skipping replay"
591                 + ex);
592           }
593           if (peer == null || peerConfig == null) {
594             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
595             continue;
596           }
597 
598           ReplicationSourceInterface src =
599               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
600                 server, peerId, this.clusterId, peerConfig, peer);
601           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
602             src.terminate("Recovered queue doesn't belong to any current peer");
603             break;
604           }
605           oldsources.add(src);
606           SortedSet<String> walsSet = entry.getValue();
607           for (String wal : walsSet) {
608             src.enqueueLog(new Path(oldLogDir, wal));
609           }
610           src.startup();
611           walsByIdRecoveredQueues.put(peerId, walsSet);
612         } catch (IOException e) {
613           // TODO manage it
614           LOG.error("Failed creating a source", e);
615         }
616       }
617     }
618   }
619 
620   /**
621    * Get the directory where wals are archived
622    * @return the directory where wals are archived
623    */
624   public Path getOldLogDir() {
625     return this.oldLogDir;
626   }
627 
628   /**
629    * Get the directory where wals are stored by their RSs
630    * @return the directory where wals are stored by their RSs
631    */
632   public Path getLogDir() {
633     return this.logDir;
634   }
635 
636   /**
637    * Get the handle on the local file system
638    * @return Handle on the local file system
639    */
640   public FileSystem getFs() {
641     return this.fs;
642   }
643 
644   /**
645    * Get a string representation of all the sources' metrics
646    */
647   public String getStats() {
648     StringBuffer stats = new StringBuffer();
649     for (ReplicationSourceInterface source : sources) {
650       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
651       stats.append(source.getStats() + "\n");
652     }
653     for (ReplicationSourceInterface oldSource : oldsources) {
654       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
655       stats.append(oldSource.getStats()+ "\n");
656     }
657     return stats.toString();
658   }
659 }