View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.regionserver.HRegionServer;
48  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
49  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationListener;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeers;
55  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  
59  import com.google.common.util.concurrent.ThreadFactoryBuilder;
60  
61  /**
62   * This class is responsible to manage all the replication
63   * sources. There are two classes of sources:
64   * <li> Normal sources are persistent and one per peer cluster</li>
65   * <li> Old sources are recovered from a failed region server and our
66   * only goal is to finish replicating the HLog queue it had up in ZK</li>
67   *
68   * When a region server dies, this class uses a watcher to get notified and it
69   * tries to grab a lock in order to transfer all the queues in a local
70   * old source.
71   *
72   * This class implements the ReplicationListener interface so that it can track changes in
73   * replication state.
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSourceManager implements ReplicationListener {
77    private static final Log LOG =
78        LogFactory.getLog(ReplicationSourceManager.class);
79    // List of all the sources that read this RS's logs
80    private final List<ReplicationSourceInterface> sources;
81    // List of all the sources we got from died RSs
82    private final List<ReplicationSourceInterface> oldsources;
83    private final ReplicationQueues replicationQueues;
84    private final ReplicationTracker replicationTracker;
85    private final ReplicationPeers replicationPeers;
86    // UUID for this cluster
87    private final UUID clusterId;
88    // All about stopping
89    private final Server server;
90    // All logs we are currently tracking
91    private final Map<String, SortedSet<String>> hlogsById;
92    // Logs for recovered sources we are currently tracking
93    private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
94    private final Configuration conf;
95    private final FileSystem fs;
96    // The path to the latest log we saw, for new coming sources
97    private Path latestPath;
98    // Path to the hlogs directories
99    private final Path logDir;
100   // Path to the hlog archive
101   private final Path oldLogDir;
102   // The number of ms that we wait before moving znodes, HBASE-3596
103   private final long sleepBeforeFailover;
104   // Homemade executer service for replication
105   private final ThreadPoolExecutor executor;
106 
107   private final Random rand;
108 
109 
110   /**
111    * Creates a replication manager and sets the watch on all the other registered region servers
112    * @param replicationQueues the interface for manipulating replication queues
113    * @param replicationPeers
114    * @param replicationTracker
115    * @param conf the configuration to use
116    * @param server the server for this region server
117    * @param fs the file system to use
118    * @param logDir the directory that contains all hlog directories of live RSs
119    * @param oldLogDir the directory where old logs are archived
120    * @param clusterId
121    */
122   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
123       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
124       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
125       final Path oldLogDir, final UUID clusterId) {
126     //CopyOnWriteArrayList is thread-safe.
127     //Generally, reading is more than modifying.
128     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
129     this.replicationQueues = replicationQueues;
130     this.replicationPeers = replicationPeers;
131     this.replicationTracker = replicationTracker;
132     this.server = server;
133     this.hlogsById = new HashMap<String, SortedSet<String>>();
134     this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
135     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
136     this.conf = conf;
137     this.fs = fs;
138     this.logDir = logDir;
139     this.oldLogDir = oldLogDir;
140     this.sleepBeforeFailover =
141         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
142     this.clusterId = clusterId;
143     this.replicationTracker.registerListener(this);
144     this.replicationPeers.getAllPeerIds();
145     // It's preferable to failover 1 RS at a time, but with good zk servers
146     // more could be processed at the same time.
147     int nbWorkers = conf.getInt("replication.executor.workers", 1);
148     // use a short 100ms sleep since this could be done inline with a RS startup
149     // even if we fail, other region servers can take care of it
150     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
151         100, TimeUnit.MILLISECONDS,
152         new LinkedBlockingQueue<Runnable>());
153     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
154     tfb.setNameFormat("ReplicationExecutor-%d");
155     this.executor.setThreadFactory(tfb.build());
156     this.rand = new Random();
157   }
158 
159   /**
160    * Provide the id of the peer and a log key and this method will figure which
161    * hlog it belongs to and will log, for this region server, the current
162    * position. It will also clean old logs from the queue.
163    * @param log Path to the log currently being replicated from
164    * replication status in zookeeper. It will also delete older entries.
165    * @param id id of the peer cluster
166    * @param position current location in the log
167    * @param queueRecovered indicates if this queue comes from another region server
168    * @param holdLogInZK if true then the log is retained in ZK
169    */
170   public void logPositionAndCleanOldLogs(Path log, String id, long position,
171       boolean queueRecovered, boolean holdLogInZK) {
172     String fileName = log.getName();
173     this.replicationQueues.setLogPosition(id, fileName, position);
174     if (holdLogInZK) {
175      return;
176     }
177     cleanOldLogs(fileName, id, queueRecovered);
178   }
179 
180   /**
181    * Cleans a log file and all older files from ZK. Called when we are sure that a
182    * log file is closed and has no more entries.
183    * @param key Path to the log
184    * @param id id of the peer cluster
185    * @param queueRecovered Whether this is a recovered queue
186    */
187   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
188     if (queueRecovered) {
189       SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
190       if (hlogs != null && !hlogs.first().equals(key)) {
191         cleanOldLogs(hlogs, key, id);
192       }
193     } else {
194       synchronized (this.hlogsById) {
195         SortedSet<String> hlogs = hlogsById.get(id);
196         if (!hlogs.first().equals(key)) {
197           cleanOldLogs(hlogs, key, id);
198         }
199       }
200     }
201  }
202   
203   private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
204     SortedSet<String> hlogSet = hlogs.headSet(key);
205     LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
206     for (String hlog : hlogSet) {
207       this.replicationQueues.removeLog(id, hlog);
208     }
209     hlogSet.clear();
210   }
211 
212   /**
213    * Adds a normal source per registered peer cluster and tries to process all
214    * old region server hlog queues
215    */
216   protected void init() throws IOException, ReplicationException {
217     for (String id : this.replicationPeers.getPeerIds()) {
218       addSource(id);
219     }
220     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
221     if (currentReplicators == null || currentReplicators.size() == 0) {
222       return;
223     }
224     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
225     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
226         + otherRegionServers);
227 
228     // Look if there's anything to process after a restart
229     for (String rs : currentReplicators) {
230       if (!otherRegionServers.contains(rs)) {
231         transferQueues(rs);
232       }
233     }
234   }
235 
236   /**
237    * Add a new normal source to this region server
238    * @param id the id of the peer cluster
239    * @return the source that was created
240    * @throws IOException
241    */
242   protected ReplicationSourceInterface addSource(String id) throws IOException,
243       ReplicationException {
244     ReplicationPeerConfig peerConfig
245       = replicationPeers.getReplicationPeerConfig(id);
246     ReplicationPeer peer = replicationPeers.getPeer(id);
247     ReplicationSourceInterface src =
248         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
249           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
250     synchronized (this.hlogsById) {
251       this.sources.add(src);
252       this.hlogsById.put(id, new TreeSet<String>());
253       // Add the latest hlog to that source's queue
254       if (this.latestPath != null) {
255         String name = this.latestPath.getName();
256         this.hlogsById.get(id).add(name);
257         try {
258           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
259         } catch (ReplicationException e) {
260           String message =
261               "Cannot add log to queue when creating a new source, queueId="
262                   + src.getPeerClusterZnode() + ", filename=" + name;
263           server.stop(message);
264           throw e;
265         }
266         src.enqueueLog(this.latestPath);
267       }
268     }
269     src.startup();
270     return src;
271   }
272 
273   /**
274    * Delete a complete queue of hlogs associated with a peer cluster
275    * @param peerId Id of the peer cluster queue of hlogs to delete
276    */
277   public void deleteSource(String peerId, boolean closeConnection) {
278     this.replicationQueues.removeQueue(peerId);
279     if (closeConnection) {
280       this.replicationPeers.peerRemoved(peerId);
281     }
282   }
283 
284   /**
285    * Terminate the replication on this region server
286    */
287   public void join() {
288     this.executor.shutdown();
289     if (this.sources.size() == 0) {
290       this.replicationQueues.removeAllQueues();
291     }
292     for (ReplicationSourceInterface source : this.sources) {
293       source.terminate("Region server is closing");
294     }
295   }
296 
297   /**
298    * Get a copy of the hlogs of the first source on this rs
299    * @return a sorted set of hlog names
300    */
301   protected Map<String, SortedSet<String>> getHLogs() {
302     return Collections.unmodifiableMap(hlogsById);
303   }
304   
305   /**
306    * Get a copy of the hlogs of the recovered sources on this rs
307    * @return a sorted set of hlog names
308    */
309   protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
310     return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
311   }
312 
313   /**
314    * Get a list of all the normal sources of this rs
315    * @return lis of all sources
316    */
317   public List<ReplicationSourceInterface> getSources() {
318     return this.sources;
319   }
320 
321   /**
322    * Get a list of all the old sources of this rs
323    * @return list of all old sources
324    */
325   public List<ReplicationSourceInterface> getOldSources() {
326     return this.oldsources;
327   }
328 
329   void preLogRoll(Path newLog) throws IOException {
330     synchronized (this.hlogsById) {
331       String name = newLog.getName();
332       for (ReplicationSourceInterface source : this.sources) {
333         try {
334           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
335         } catch (ReplicationException e) {
336           throw new IOException("Cannot add log to replication queue with id="
337               + source.getPeerClusterZnode() + ", filename=" + name, e);
338         }
339       }
340       for (SortedSet<String> hlogs : this.hlogsById.values()) {
341         if (this.sources.isEmpty()) {
342           // If there's no slaves, don't need to keep the old hlogs since
343           // we only consider the last one when a new slave comes in
344           hlogs.clear();
345         }
346         hlogs.add(name);
347       }
348     }
349 
350     this.latestPath = newLog;
351   }
352 
353   void postLogRoll(Path newLog) throws IOException {
354     // This only updates the sources we own, not the recovered ones
355     for (ReplicationSourceInterface source : this.sources) {
356       source.enqueueLog(newLog);
357     }
358   }
359 
360   /**
361    * Factory method to create a replication source
362    * @param conf the configuration to use
363    * @param fs the file system to use
364    * @param manager the manager to use
365    * @param server the server object for this region server
366    * @param peerId the id of the peer cluster
367    * @return the created source
368    * @throws IOException
369    */
370   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
371       final FileSystem fs, final ReplicationSourceManager manager,
372       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
373       final Server server, final String peerId, final UUID clusterId,
374       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
375           throws IOException {
376     RegionServerCoprocessorHost rsServerHost = null;
377     if (server instanceof HRegionServer) {
378       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
379     }
380     ReplicationSourceInterface src;
381     try {
382       @SuppressWarnings("rawtypes")
383       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
384           ReplicationSource.class.getCanonicalName()));
385       src = (ReplicationSourceInterface) c.newInstance();
386     } catch (Exception e) {
387       LOG.warn("Passed replication source implementation throws errors, " +
388           "defaulting to ReplicationSource", e);
389       src = new ReplicationSource();
390     }
391 
392     ReplicationEndpoint replicationEndpoint = null;
393     try {
394       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
395       if (replicationEndpointImpl == null) {
396         // Default to HBase inter-cluster replication endpoint
397         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
398       }
399       @SuppressWarnings("rawtypes")
400       Class c = Class.forName(replicationEndpointImpl);
401       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
402       if(rsServerHost != null) {
403         ReplicationEndpoint newReplicationEndPoint = rsServerHost
404             .postCreateReplicationEndPoint(replicationEndpoint);
405         if(newReplicationEndPoint != null) {
406           // Override the newly created endpoint from the hook with configured end point
407           replicationEndpoint = newReplicationEndPoint;
408         }
409       }
410     } catch (Exception e) {
411       LOG.warn("Passed replication endpoint implementation throws errors", e);
412       throw new IOException(e);
413     }
414 
415     MetricsSource metrics = new MetricsSource(peerId);
416     // init replication source
417     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
418       clusterId, replicationEndpoint, metrics);
419 
420     // init replication endpoint
421     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
422       fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
423 
424     return src;
425   }
426 
427   /**
428    * Transfer all the queues of the specified to this region server.
429    * First it tries to grab a lock and if it works it will move the
430    * znodes and finally will delete the old znodes.
431    *
432    * It creates one old source for any type of source of the old rs.
433    * @param rsZnode
434    */
435   private void transferQueues(String rsZnode) {
436     NodeFailoverWorker transfer =
437         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
438             this.clusterId);
439     try {
440       this.executor.execute(transfer);
441     } catch (RejectedExecutionException ex) {
442       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
443     }
444   }
445 
446   /**
447    * Clear the references to the specified old source
448    * @param src source to clear
449    */
450   public void closeRecoveredQueue(ReplicationSourceInterface src) {
451     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
452     this.oldsources.remove(src);
453     deleteSource(src.getPeerClusterZnode(), false);
454     this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
455   }
456 
457   /**
458    * Thie method first deletes all the recovered sources for the specified
459    * id, then deletes the normal source (deleting all related data in ZK).
460    * @param id The id of the peer cluster
461    */
462   public void removePeer(String id) {
463     LOG.info("Closing the following queue " + id + ", currently have "
464         + sources.size() + " and another "
465         + oldsources.size() + " that were recovered");
466     String terminateMessage = "Replication stream was removed by a user";
467     ReplicationSourceInterface srcToRemove = null;
468     List<ReplicationSourceInterface> oldSourcesToDelete =
469         new ArrayList<ReplicationSourceInterface>();
470     // First close all the recovered sources for this peer
471     for (ReplicationSourceInterface src : oldsources) {
472       if (id.equals(src.getPeerClusterId())) {
473         oldSourcesToDelete.add(src);
474       }
475     }
476     for (ReplicationSourceInterface src : oldSourcesToDelete) {
477       src.terminate(terminateMessage);
478       closeRecoveredQueue((src));
479     }
480     LOG.info("Number of deleted recovered sources for " + id + ": "
481         + oldSourcesToDelete.size());
482     // Now look for the one on this cluster
483     for (ReplicationSourceInterface src : this.sources) {
484       if (id.equals(src.getPeerClusterId())) {
485         srcToRemove = src;
486         break;
487       }
488     }
489     if (srcToRemove == null) {
490       LOG.error("The queue we wanted to close is missing " + id);
491       return;
492     }
493     srcToRemove.terminate(terminateMessage);
494     this.sources.remove(srcToRemove);
495     deleteSource(id, true);
496   }
497 
498   @Override
499   public void regionServerRemoved(String regionserver) {
500     transferQueues(regionserver);
501   }
502 
503   @Override
504   public void peerRemoved(String peerId) {
505     removePeer(peerId);
506   }
507 
508   @Override
509   public void peerListChanged(List<String> peerIds) {
510     for (String id : peerIds) {
511       try {
512         boolean added = this.replicationPeers.peerAdded(id);
513         if (added) {
514           addSource(id);
515         }
516       } catch (Exception e) {
517         LOG.error("Error while adding a new peer", e);
518       }
519     }
520   }
521 
522   /**
523    * Class responsible to setup new ReplicationSources to take care of the
524    * queues from dead region servers.
525    */
526   class NodeFailoverWorker extends Thread {
527 
528     private String rsZnode;
529     private final ReplicationQueues rq;
530     private final ReplicationPeers rp;
531     private final UUID clusterId;
532 
533     /**
534      *
535      * @param rsZnode
536      */
537     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
538         final ReplicationPeers replicationPeers, final UUID clusterId) {
539       super("Failover-for-"+rsZnode);
540       this.rsZnode = rsZnode;
541       this.rq = replicationQueues;
542       this.rp = replicationPeers;
543       this.clusterId = clusterId;
544     }
545 
546     @Override
547     public void run() {
548       if (this.rq.isThisOurZnode(rsZnode)) {
549         return;
550       }
551       // Wait a bit before transferring the queues, we may be shutting down.
552       // This sleep may not be enough in some cases.
553       try {
554         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
555       } catch (InterruptedException e) {
556         LOG.warn("Interrupted while waiting before transferring a queue.");
557         Thread.currentThread().interrupt();
558       }
559       // We try to lock that rs' queue directory
560       if (server.isStopped()) {
561         LOG.info("Not transferring queue since we are shutting down");
562         return;
563       }
564       SortedMap<String, SortedSet<String>> newQueues = null;
565 
566       newQueues = this.rq.claimQueues(rsZnode);
567 
568       // Copying over the failed queue is completed.
569       if (newQueues.isEmpty()) {
570         // We either didn't get the lock or the failed region server didn't have any outstanding
571         // HLogs to replicate, so we are done.
572         return;
573       }
574 
575       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
576         String peerId = entry.getKey();
577         try {
578           // there is not an actual peer defined corresponding to peerId for the failover.
579           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
580           String actualPeerId = replicationQueueInfo.getPeerId();
581           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
582           ReplicationPeerConfig peerConfig = null;
583           try {
584             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
585           } catch (ReplicationException ex) {
586             LOG.warn("Received exception while getting replication peer config, skipping replay"
587                 + ex);
588           }
589           if (peer == null || peerConfig == null) {
590             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
591             continue;
592           }
593 
594           ReplicationSourceInterface src =
595               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
596                 server, peerId, this.clusterId, peerConfig, peer);
597           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
598             src.terminate("Recovered queue doesn't belong to any current peer");
599             break;
600           }
601           oldsources.add(src);
602           SortedSet<String> hlogsSet = entry.getValue();
603           for (String hlog : hlogsSet) {
604             src.enqueueLog(new Path(oldLogDir, hlog));
605           }
606           src.startup();
607           hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
608         } catch (IOException e) {
609           // TODO manage it
610           LOG.error("Failed creating a source", e);
611         }
612       }
613     }
614   }
615 
616   /**
617    * Get the directory where hlogs are archived
618    * @return the directory where hlogs are archived
619    */
620   public Path getOldLogDir() {
621     return this.oldLogDir;
622   }
623 
624   /**
625    * Get the directory where hlogs are stored by their RSs
626    * @return the directory where hlogs are stored by their RSs
627    */
628   public Path getLogDir() {
629     return this.logDir;
630   }
631 
632   /**
633    * Get the handle on the local file system
634    * @return Handle on the local file system
635    */
636   public FileSystem getFs() {
637     return this.fs;
638   }
639 
640   /**
641    * Get a string representation of all the sources' metrics
642    */
643   public String getStats() {
644     StringBuffer stats = new StringBuffer();
645     for (ReplicationSourceInterface source : sources) {
646       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
647       stats.append(source.getStats() + "\n");
648     }
649     for (ReplicationSourceInterface oldSource : oldsources) {
650       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
651       stats.append(oldSource.getStats()+ "\n");
652     }
653     return stats.toString();
654   }
655 }