View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.SortedSet;
31  import java.util.TreeSet;
32  import java.util.UUID;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.CopyOnWriteArrayList;
35  import java.util.concurrent.LinkedBlockingQueue;
36  import java.util.concurrent.RejectedExecutionException;
37  import java.util.concurrent.ThreadPoolExecutor;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.TableDescriptors;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.regionserver.HRegionServer;
49  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
50  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationListener;
53  import org.apache.hadoop.hbase.replication.ReplicationPeer;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.ReplicationTracker;
59  
60  import com.google.common.util.concurrent.ThreadFactoryBuilder;
61  
62  /**
63   * This class is responsible to manage all the replication
64   * sources. There are two classes of sources:
65   * <ul>
66   * <li> Normal sources are persistent and one per peer cluster</li>
67   * <li> Old sources are recovered from a failed region server and our
68   * only goal is to finish replicating the WAL queue it had up in ZK</li>
69   * </ul>
70   *
71   * When a region server dies, this class uses a watcher to get notified and it
72   * tries to grab a lock in order to transfer all the queues in a local
73   * old source.
74   *
75   * This class implements the ReplicationListener interface so that it can track changes in
76   * replication state.
77   */
78  @InterfaceAudience.Private
79  public class ReplicationSourceManager implements ReplicationListener {
80    private static final Log LOG =
81        LogFactory.getLog(ReplicationSourceManager.class);
82    // List of all the sources that read this RS's logs
83    private final List<ReplicationSourceInterface> sources;
84    // List of all the sources we got from died RSs
85    private final List<ReplicationSourceInterface> oldsources;
86    private final ReplicationQueues replicationQueues;
87    private final ReplicationTracker replicationTracker;
88    private final ReplicationPeers replicationPeers;
89    // UUID for this cluster
90    private final UUID clusterId;
91    // All about stopping
92    private final Server server;
93    // All logs we are currently tracking
94    private final Map<String, SortedSet<String>> walsById;
95    // Logs for recovered sources we are currently tracking
96    private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
97    private final Configuration conf;
98    private final FileSystem fs;
99    // The path to the latest log we saw, for new coming sources
100   private Path latestPath;
101   // Path to the wals directories
102   private final Path logDir;
103   // Path to the wal archive
104   private final Path oldLogDir;
105   // The number of ms that we wait before moving znodes, HBASE-3596
106   private final long sleepBeforeFailover;
107   // Homemade executer service for replication
108   private final ThreadPoolExecutor executor;
109 
110   private final Random rand;
111 
112 
113   /**
114    * Creates a replication manager and sets the watch on all the other registered region servers
115    * @param replicationQueues the interface for manipulating replication queues
116    * @param replicationPeers
117    * @param replicationTracker
118    * @param conf the configuration to use
119    * @param server the server for this region server
120    * @param fs the file system to use
121    * @param logDir the directory that contains all wal directories of live RSs
122    * @param oldLogDir the directory where old logs are archived
123    * @param clusterId
124    */
125   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
126       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
127       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
128       final Path oldLogDir, final UUID clusterId) {
129     //CopyOnWriteArrayList is thread-safe.
130     //Generally, reading is more than modifying.
131     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
132     this.replicationQueues = replicationQueues;
133     this.replicationPeers = replicationPeers;
134     this.replicationTracker = replicationTracker;
135     this.server = server;
136     this.walsById = new HashMap<String, SortedSet<String>>();
137     this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
138     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
139     this.conf = conf;
140     this.fs = fs;
141     this.logDir = logDir;
142     this.oldLogDir = oldLogDir;
143     this.sleepBeforeFailover =
144         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
145     this.clusterId = clusterId;
146     this.replicationTracker.registerListener(this);
147     this.replicationPeers.getAllPeerIds();
148     // It's preferable to failover 1 RS at a time, but with good zk servers
149     // more could be processed at the same time.
150     int nbWorkers = conf.getInt("replication.executor.workers", 1);
151     // use a short 100ms sleep since this could be done inline with a RS startup
152     // even if we fail, other region servers can take care of it
153     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
154         100, TimeUnit.MILLISECONDS,
155         new LinkedBlockingQueue<Runnable>());
156     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
157     tfb.setNameFormat("ReplicationExecutor-%d");
158     tfb.setDaemon(true);
159     this.executor.setThreadFactory(tfb.build());
160     this.rand = new Random();
161   }
162 
163   /**
164    * Provide the id of the peer and a log key and this method will figure which
165    * wal it belongs to and will log, for this region server, the current
166    * position. It will also clean old logs from the queue.
167    * @param log Path to the log currently being replicated from
168    * replication status in zookeeper. It will also delete older entries.
169    * @param id id of the peer cluster
170    * @param position current location in the log
171    * @param queueRecovered indicates if this queue comes from another region server
172    * @param holdLogInZK if true then the log is retained in ZK
173    */
174   public void logPositionAndCleanOldLogs(Path log, String id, long position,
175       boolean queueRecovered, boolean holdLogInZK) {
176     String fileName = log.getName();
177     this.replicationQueues.setLogPosition(id, fileName, position);
178     if (holdLogInZK) {
179      return;
180     }
181     cleanOldLogs(fileName, id, queueRecovered);
182   }
183 
184   /**
185    * Cleans a log file and all older files from ZK. Called when we are sure that a
186    * log file is closed and has no more entries.
187    * @param key Path to the log
188    * @param id id of the peer cluster
189    * @param queueRecovered Whether this is a recovered queue
190    */
191   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
192     if (queueRecovered) {
193       SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
194       if (wals != null && !wals.first().equals(key)) {
195         cleanOldLogs(wals, key, id);
196       }
197     } else {
198       synchronized (this.walsById) {
199         SortedSet<String> wals = walsById.get(id);
200         if (!wals.first().equals(key)) {
201           cleanOldLogs(wals, key, id);
202         }
203       }
204     }
205  }
206 
207   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
208     SortedSet<String> walSet = wals.headSet(key);
209     LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
210     for (String wal : walSet) {
211       this.replicationQueues.removeLog(id, wal);
212     }
213     walSet.clear();
214   }
215 
216   /**
217    * Adds a normal source per registered peer cluster and tries to process all
218    * old region server wal queues
219    */
220   protected void init() throws IOException, ReplicationException {
221     for (String id : this.replicationPeers.getPeerIds()) {
222       addSource(id);
223     }
224     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
225     if (currentReplicators == null || currentReplicators.size() == 0) {
226       return;
227     }
228     List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
229     LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
230         + otherRegionServers);
231 
232     // Look if there's anything to process after a restart
233     for (String rs : currentReplicators) {
234       if (!otherRegionServers.contains(rs)) {
235         transferQueues(rs);
236       }
237     }
238   }
239 
240   /**
241    * Add a new normal source to this region server
242    * @param id the id of the peer cluster
243    * @return the source that was created
244    * @throws IOException
245    */
246   protected ReplicationSourceInterface addSource(String id) throws IOException,
247       ReplicationException {
248     ReplicationPeerConfig peerConfig
249       = replicationPeers.getReplicationPeerConfig(id);
250     ReplicationPeer peer = replicationPeers.getPeer(id);
251     ReplicationSourceInterface src =
252         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
253           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
254     synchronized (this.walsById) {
255       this.sources.add(src);
256       this.walsById.put(id, new TreeSet<String>());
257       // Add the latest wal to that source's queue
258       if (this.latestPath != null) {
259         String name = this.latestPath.getName();
260         this.walsById.get(id).add(name);
261         try {
262           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
263         } catch (ReplicationException e) {
264           String message =
265               "Cannot add log to queue when creating a new source, queueId="
266                   + src.getPeerClusterZnode() + ", filename=" + name;
267           server.stop(message);
268           throw e;
269         }
270         src.enqueueLog(this.latestPath);
271       }
272     }
273     src.startup();
274     return src;
275   }
276 
277   /**
278    * Delete a complete queue of wals associated with a peer cluster
279    * @param peerId Id of the peer cluster queue of wals to delete
280    */
281   public void deleteSource(String peerId, boolean closeConnection) {
282     this.replicationQueues.removeQueue(peerId);
283     if (closeConnection) {
284       this.replicationPeers.peerRemoved(peerId);
285     }
286   }
287 
288   /**
289    * Terminate the replication on this region server
290    */
291   public void join() {
292     this.executor.shutdown();
293     if (this.sources.size() == 0) {
294       this.replicationQueues.removeAllQueues();
295     }
296     for (ReplicationSourceInterface source : this.sources) {
297       source.terminate("Region server is closing");
298     }
299   }
300 
301   /**
302    * Get a copy of the wals of the first source on this rs
303    * @return a sorted set of wal names
304    */
305   protected Map<String, SortedSet<String>> getWALs() {
306     return Collections.unmodifiableMap(walsById);
307   }
308 
309   /**
310    * Get a copy of the wals of the recovered sources on this rs
311    * @return a sorted set of wal names
312    */
313   protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
314     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
315   }
316 
317   /**
318    * Get a list of all the normal sources of this rs
319    * @return lis of all sources
320    */
321   public List<ReplicationSourceInterface> getSources() {
322     return this.sources;
323   }
324 
325   /**
326    * Get a list of all the old sources of this rs
327    * @return list of all old sources
328    */
329   public List<ReplicationSourceInterface> getOldSources() {
330     return this.oldsources;
331   }
332 
333   void preLogRoll(Path newLog) throws IOException {
334     synchronized (this.walsById) {
335       String name = newLog.getName();
336       for (ReplicationSourceInterface source : this.sources) {
337         try {
338           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
339         } catch (ReplicationException e) {
340           throw new IOException("Cannot add log to replication queue with id="
341               + source.getPeerClusterZnode() + ", filename=" + name, e);
342         }
343       }
344       for (SortedSet<String> wals : this.walsById.values()) {
345         if (this.sources.isEmpty()) {
346           // If there's no slaves, don't need to keep the old wals since
347           // we only consider the last one when a new slave comes in
348           wals.clear();
349         }
350         wals.add(name);
351       }
352     }
353 
354     this.latestPath = newLog;
355   }
356 
357   void postLogRoll(Path newLog) throws IOException {
358     // This only updates the sources we own, not the recovered ones
359     for (ReplicationSourceInterface source : this.sources) {
360       source.enqueueLog(newLog);
361     }
362   }
363 
364   /**
365    * Factory method to create a replication source
366    * @param conf the configuration to use
367    * @param fs the file system to use
368    * @param manager the manager to use
369    * @param server the server object for this region server
370    * @param peerId the id of the peer cluster
371    * @return the created source
372    * @throws IOException
373    */
374   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
375       final FileSystem fs, final ReplicationSourceManager manager,
376       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
377       final Server server, final String peerId, final UUID clusterId,
378       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
379           throws IOException {
380     RegionServerCoprocessorHost rsServerHost = null;
381     TableDescriptors tableDescriptors = null;
382     if (server instanceof HRegionServer) {
383       rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
384       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
385     }
386     ReplicationSourceInterface src;
387     try {
388       @SuppressWarnings("rawtypes")
389       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
390           ReplicationSource.class.getCanonicalName()));
391       src = (ReplicationSourceInterface) c.newInstance();
392     } catch (Exception e) {
393       LOG.warn("Passed replication source implementation throws errors, " +
394           "defaulting to ReplicationSource", e);
395       src = new ReplicationSource();
396     }
397 
398     ReplicationEndpoint replicationEndpoint = null;
399     try {
400       String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
401       if (replicationEndpointImpl == null) {
402         // Default to HBase inter-cluster replication endpoint
403         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
404       }
405       @SuppressWarnings("rawtypes")
406       Class c = Class.forName(replicationEndpointImpl);
407       replicationEndpoint = (ReplicationEndpoint) c.newInstance();
408       if(rsServerHost != null) {
409         ReplicationEndpoint newReplicationEndPoint = rsServerHost
410             .postCreateReplicationEndPoint(replicationEndpoint);
411         if(newReplicationEndPoint != null) {
412           // Override the newly created endpoint from the hook with configured end point
413           replicationEndpoint = newReplicationEndPoint;
414         }
415       }
416     } catch (Exception e) {
417       LOG.warn("Passed replication endpoint implementation throws errors", e);
418       throw new IOException(e);
419     }
420 
421     MetricsSource metrics = new MetricsSource(peerId);
422     // init replication source
423     src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
424       clusterId, replicationEndpoint, metrics);
425 
426     // init replication endpoint
427     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
428       fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
429 
430     return src;
431   }
432 
433   /**
434    * Transfer all the queues of the specified to this region server.
435    * First it tries to grab a lock and if it works it will move the
436    * znodes and finally will delete the old znodes.
437    *
438    * It creates one old source for any type of source of the old rs.
439    * @param rsZnode
440    */
441   private void transferQueues(String rsZnode) {
442     NodeFailoverWorker transfer =
443         new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
444             this.clusterId);
445     try {
446       this.executor.execute(transfer);
447     } catch (RejectedExecutionException ex) {
448       LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
449     }
450   }
451 
452   /**
453    * Clear the references to the specified old source
454    * @param src source to clear
455    */
456   public void closeRecoveredQueue(ReplicationSourceInterface src) {
457     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
458     this.oldsources.remove(src);
459     deleteSource(src.getPeerClusterZnode(), false);
460     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
461   }
462 
463   /**
464    * Thie method first deletes all the recovered sources for the specified
465    * id, then deletes the normal source (deleting all related data in ZK).
466    * @param id The id of the peer cluster
467    */
468   public void removePeer(String id) {
469     LOG.info("Closing the following queue " + id + ", currently have "
470         + sources.size() + " and another "
471         + oldsources.size() + " that were recovered");
472     String terminateMessage = "Replication stream was removed by a user";
473     ReplicationSourceInterface srcToRemove = null;
474     List<ReplicationSourceInterface> oldSourcesToDelete =
475         new ArrayList<ReplicationSourceInterface>();
476     // First close all the recovered sources for this peer
477     for (ReplicationSourceInterface src : oldsources) {
478       if (id.equals(src.getPeerClusterId())) {
479         oldSourcesToDelete.add(src);
480       }
481     }
482     for (ReplicationSourceInterface src : oldSourcesToDelete) {
483       src.terminate(terminateMessage);
484       closeRecoveredQueue((src));
485     }
486     LOG.info("Number of deleted recovered sources for " + id + ": "
487         + oldSourcesToDelete.size());
488     // Now look for the one on this cluster
489     for (ReplicationSourceInterface src : this.sources) {
490       if (id.equals(src.getPeerClusterId())) {
491         srcToRemove = src;
492         break;
493       }
494     }
495     if (srcToRemove == null) {
496       LOG.error("The queue we wanted to close is missing " + id);
497       return;
498     }
499     srcToRemove.terminate(terminateMessage);
500     this.sources.remove(srcToRemove);
501     deleteSource(id, true);
502   }
503 
504   @Override
505   public void regionServerRemoved(String regionserver) {
506     transferQueues(regionserver);
507   }
508 
509   @Override
510   public void peerRemoved(String peerId) {
511     removePeer(peerId);
512   }
513 
514   @Override
515   public void peerListChanged(List<String> peerIds) {
516     for (String id : peerIds) {
517       try {
518         boolean added = this.replicationPeers.peerAdded(id);
519         if (added) {
520           addSource(id);
521         }
522       } catch (Exception e) {
523         LOG.error("Error while adding a new peer", e);
524       }
525     }
526   }
527 
528   /**
529    * Class responsible to setup new ReplicationSources to take care of the
530    * queues from dead region servers.
531    */
532   class NodeFailoverWorker extends Thread {
533 
534     private String rsZnode;
535     private final ReplicationQueues rq;
536     private final ReplicationPeers rp;
537     private final UUID clusterId;
538 
539     /**
540      *
541      * @param rsZnode
542      */
543     public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
544         final ReplicationPeers replicationPeers, final UUID clusterId) {
545       super("Failover-for-"+rsZnode);
546       this.rsZnode = rsZnode;
547       this.rq = replicationQueues;
548       this.rp = replicationPeers;
549       this.clusterId = clusterId;
550     }
551 
552     @Override
553     public void run() {
554       if (this.rq.isThisOurZnode(rsZnode)) {
555         return;
556       }
557       // Wait a bit before transferring the queues, we may be shutting down.
558       // This sleep may not be enough in some cases.
559       try {
560         Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
561       } catch (InterruptedException e) {
562         LOG.warn("Interrupted while waiting before transferring a queue.");
563         Thread.currentThread().interrupt();
564       }
565       // We try to lock that rs' queue directory
566       if (server.isStopped()) {
567         LOG.info("Not transferring queue since we are shutting down");
568         return;
569       }
570       SortedMap<String, SortedSet<String>> newQueues = null;
571 
572       newQueues = this.rq.claimQueues(rsZnode);
573 
574       // Copying over the failed queue is completed.
575       if (newQueues.isEmpty()) {
576         // We either didn't get the lock or the failed region server didn't have any outstanding
577         // WALs to replicate, so we are done.
578         return;
579       }
580 
581       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
582         String peerId = entry.getKey();
583         try {
584           // there is not an actual peer defined corresponding to peerId for the failover.
585           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
586           String actualPeerId = replicationQueueInfo.getPeerId();
587           ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
588           ReplicationPeerConfig peerConfig = null;
589           try {
590             peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
591           } catch (ReplicationException ex) {
592             LOG.warn("Received exception while getting replication peer config, skipping replay"
593                 + ex);
594           }
595           if (peer == null || peerConfig == null) {
596             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
597             continue;
598           }
599 
600           ReplicationSourceInterface src =
601               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
602                 server, peerId, this.clusterId, peerConfig, peer);
603           if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
604             src.terminate("Recovered queue doesn't belong to any current peer");
605             break;
606           }
607           oldsources.add(src);
608           SortedSet<String> walsSet = entry.getValue();
609           for (String wal : walsSet) {
610             src.enqueueLog(new Path(oldLogDir, wal));
611           }
612           src.startup();
613           walsByIdRecoveredQueues.put(peerId, walsSet);
614         } catch (IOException e) {
615           // TODO manage it
616           LOG.error("Failed creating a source", e);
617         }
618       }
619     }
620   }
621 
622   /**
623    * Get the directory where wals are archived
624    * @return the directory where wals are archived
625    */
626   public Path getOldLogDir() {
627     return this.oldLogDir;
628   }
629 
630   /**
631    * Get the directory where wals are stored by their RSs
632    * @return the directory where wals are stored by their RSs
633    */
634   public Path getLogDir() {
635     return this.logDir;
636   }
637 
638   /**
639    * Get the handle on the local file system
640    * @return Handle on the local file system
641    */
642   public FileSystem getFs() {
643     return this.fs;
644   }
645 
646   /**
647    * Get a string representation of all the sources' metrics
648    */
649   public String getStats() {
650     StringBuffer stats = new StringBuffer();
651     for (ReplicationSourceInterface source : sources) {
652       stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
653       stats.append(source.getStats() + "\n");
654     }
655     for (ReplicationSourceInterface oldSource : oldsources) {
656       stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
657       stats.append(oldSource.getStats()+ "\n");
658     }
659     return stats.toString();
660   }
661 }